This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch dev-2.1.6 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 9c80377c88416c14e62c47231dda6d96167a5956 Author: benjobs <[email protected]> AuthorDate: Fri Jan 24 20:24:49 2025 +0800 [Improve] add AssertUtils --- .../apache/streampark/common/fs/LfsOperator.scala | 6 +- .../streampark/common/util/AssertUtils.scala | 300 +++++++++++++++++++++ .../apache/streampark/common/util/FileUtils.scala | 4 +- .../org/apache/streampark/common/util/Logger.scala | 2 +- .../streampark/common/util/PropertiesUtils.scala | 2 +- .../org/apache/streampark/common/util/Utils.scala | 41 +-- .../streampark/console/base/util/CommonUtils.java | 8 +- .../core/controller/ExternalLinkController.java | 4 +- .../streampark/console/core/entity/Project.java | 3 +- .../console/core/runner/EnvInitializer.java | 8 +- .../core/service/alert/impl/AlertServiceImpl.java | 4 +- .../core/service/impl/AppBuildPipeServiceImpl.java | 3 +- .../service/impl/ApplicationConfigServiceImpl.java | 4 +- .../core/service/impl/ApplicationServiceImpl.java | 7 +- .../core/service/impl/ExternalLinkServiceImpl.java | 8 +- .../core/service/impl/FlinkSqlServiceImpl.java | 5 +- .../core/service/impl/ProjectServiceImpl.java | 10 +- .../core/service/impl/SavepointServiceImpl.java | 11 +- .../core/service/impl/SqlCompleteServiceImpl.java | 4 +- .../core/service/impl/YarnQueueServiceImpl.java | 15 +- .../system/service/impl/MemberServiceImpl.java | 8 +- .../system/service/impl/UserServiceImpl.java | 8 +- .../connector/hbase/source/HBaseJavaSource.java | 6 +- .../flink/connector/jdbc/sink/JdbcJavaSink.java | 4 +- .../connector/jdbc/source/JdbcJavaSource.java | 6 +- .../flink/connector/jdbc/source/JdbcSource.scala | 16 +- .../connector/mongo/source/MongoJavaSource.java | 8 +- .../flink/packer/docker/DockerRetriever.scala | 4 +- 28 files changed, 392 insertions(+), 117 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala index ca6d50b25..313b06963 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala @@ -17,8 +17,8 @@ package org.apache.streampark.common.fs -import org.apache.streampark.common.util.Logger -import org.apache.streampark.common.util.Utils.{isAnyBank, notEmpty} +import org.apache.streampark.common.util.{AssertUtils, Logger} +import org.apache.streampark.common.util.Utils.isAnyBank import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.io.{FileUtils, IOUtils} @@ -41,7 +41,7 @@ object LfsOperator extends FsOperator with Logger { } override def delete(path: String): Unit = { - if (notEmpty(path)) { + if (AssertUtils.isNotEmpty(path)) { val file = new File(path) if (file.exists()) { FileUtils.forceDelete(file) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala new file mode 100644 index 000000000..5d0eb5042 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import javax.annotation.Nullable + +import java.util +import java.util.{Collection => JavaCollection, Map => JavaMap} + +import scala.collection.JavaConversions._ + +/** @since 2.1.6 */ +object AssertUtils { + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the + * condition is not met (evaluates to {@code false}). + * + * @param condition + * The condition to check + * @throws IllegalArgumentException + * Thrown, if the condition is violated. + */ + def required(condition: Boolean): Unit = { + if (!condition) { + throw new IllegalArgumentException + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the + * condition is not met (evaluates to {@code false}). The exception will have the given error + * message. + * + * @param condition + * The condition to check + * @param message + * The message for the {@code IllegalArgumentException} that is thrown if the check fails. + * @throws IllegalArgumentException + * Thrown, if the condition is violated. + */ + def required(condition: Boolean, @Nullable message: String): Unit = { + if (!condition) { + throw new IllegalArgumentException(message) + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalStateException} if the + * condition is not met (evaluates to {@code false}). + * + * @param condition + * The condition to check + * @throws IllegalStateException + * Thrown, if the condition is violated. + */ + def state(condition: Boolean): Unit = { + if (!condition) { + throw new IllegalStateException + } + } + + /** + * Checks the given boolean condition, and throws an IllegalStateException if the condition is not + * met (evaluates to {@code false}). The exception will have the given error message. + * + * @param condition + * The condition to check + * @param message + * The message for the IllegalStateException that is thrown if the check fails. + * @throws IllegalStateException + * Thrown, if the condition is violated. + */ + def state(condition: Boolean, @Nullable message: String): Unit = { + if (!condition) { + throw new IllegalStateException(message) + } + } + + // ------------------------------------------------------------------------ + // Null checks + // ------------------------------------------------------------------------ + /** Ensures that the given object reference is not null. Upon violation, a */ + def notNull[T](@Nullable reference: T): T = { + if (reference == null) { + throw new NullPointerException + } + reference + } + + /** + * Ensures that the given object reference is not null. Upon violation, a NullPointerException + * that is thrown if the check fails. + * + * @return + * The object reference itself (generically typed). + * @throws NullPointerException + * Thrown, if the passed reference was null. + */ + def notNull[T](@Nullable reference: T, @Nullable message: String): T = { + if (reference == null) { + throw new NullPointerException(message) + } + reference + } + + def isEmpty(reference: AnyRef): Boolean = !isNotEmpty(reference) + + def isNotEmpty(elem: AnyRef): Boolean = { + elem match { + case null => false + case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty + case x if x.isInstanceOf[CharSequence] => elem.toString.trim.nonEmpty + case x if x.isInstanceOf[Traversable[_]] => x.asInstanceOf[Traversable[_]].nonEmpty + case x if x.isInstanceOf[Iterable[_]] => x.asInstanceOf[Iterable[_]].nonEmpty + case x if x.isInstanceOf[JavaCollection[_]] => !x.asInstanceOf[JavaCollection[_]].isEmpty + case x if x.isInstanceOf[JavaMap[_, _]] => !x.asInstanceOf[JavaMap[_, _]].isEmpty + case _ => true + } + } + + /** + * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and + * must contain at least one element. <pre class="code">AssertUtils.notEmpty(array, "must be + * contain elements");</pre> + * + * @param reference + * the object to check + * @throws IllegalArgumentException + * if the object array is {@code null} or contains no elements + */ + def notEmpty(reference: AnyRef): Unit = { + if (isEmpty(reference)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and + * must contain at least one element. <pre class="code"> AssertUtils.notEmpty(array, "must be + * contain elements");</pre> + * + * @param reference + * the object to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the object array is {@code null} or contains no elements + */ + def notEmpty(@Nullable reference: AnyRef, message: String): Unit = { + if (isEmpty(reference)) { + throw new IllegalArgumentException(message) + } + } + + /** + * Assert that an array contains no {@code null} elements. <p>Note: Does not complain if the array + * is empty! <pre class="code">AssertUtils.noNullElements(array, "The array must contain non-null + * elements");</pre> + * + * @param array + * the array to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the object array contains a {@code null} element + */ + def noNullElements(@Nullable array: Array[AnyRef], message: String): Unit = { + if (array != null) for (element <- array) { + if (element == null) throw new IllegalArgumentException(message) + } + } + + /** + * Assert that a collection contains no {@code null} elements. <p>Note: Does not complain if the + * collection is empty! <pre class="code">AssertUtils.noNullElements(collection, "Collection must + * contain non-null elements");</pre> + * + * @param collection + * the collection to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the collection contains a {@code null} element + */ + def noNullElements(@Nullable collection: util.Collection[_], message: String): Unit = { + if (collection != null) for (element <- collection) { + if (element == null) { + throw new IllegalArgumentException(message) + } + } + } + + /** + * Assert that the given String is not empty; that is, it must not be {@code null} and not the + * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre> + * + * @param text + * the String to check + * @throws IllegalArgumentException + * if the text is empty + * @see + * StringUtils#hasLength + */ + def hasLength(@Nullable text: String): Unit = { + if (!getHasLength(text)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that the given String is not empty; that is, it must not be {@code null} and not the + * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre> + * + * @param text + * the String to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the text is empty + * @see + * StringUtils#hasLength + */ + def hasLength(@Nullable text: String, message: String): Unit = { + if (!getHasLength(text)) { + throw new IllegalArgumentException(message) + } + } + + /** + * Assert that the given String contains valid text content; that is, it must not be {@code null} + * and must contain at least one non-whitespace character. <pre + * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre> + * + * @param text + * the String to check + * @throws IllegalArgumentException + * if the text does not contain valid text content + * @see + * StringUtils#hasText + */ + def hasText(@Nullable text: String): Unit = { + if (!getHasText(text)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that the given String contains valid text content; that is, it must not be {@code null} + * and must contain at least one non-whitespace character. <pre + * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre> + * + * @param text + * the String to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the text does not contain valid text content + * @see + * StringUtils#hasText + */ + def hasText(@Nullable text: String, message: String): Unit = { + if (!getHasText(text)) { + throw new IllegalArgumentException(message) + } + } + + private[this] def getHasLength(@Nullable str: String): Boolean = + str != null && str.nonEmpty + + private[this] def getHasText(@Nullable str: String): Boolean = { + str != null && str.nonEmpty && containsText(str) + } + + private[this] def containsText(str: CharSequence): Boolean = { + val strLen = str.length + for (i <- 0 until strLen) { + if (!Character.isWhitespace(str.charAt(i))) { + return true + } + } + false + } + +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index d3eed9d2f..3090a72a7 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -92,8 +92,8 @@ object FileUtils { def getPathFromEnv(env: String): String = { val path = System.getenv(env) - require( - Utils.notEmpty(path), + AssertUtils.required( + AssertUtils.isNotEmpty(path), s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env") val file = new File(path) require(file.exists(), s"[StreamPark] FileUtils.getPathFromEnv: $env is not exist!") diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala index 3ce4fd9a7..1c6daa983 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala @@ -134,7 +134,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder { private val shadedPackage = "org.apache.streampark.shaded" override def configureByResource(url: URL): Unit = { - Utils.notNull(url, "URL argument cannot be null") + AssertUtils.notNull(url, "URL argument cannot be null") val path = url.getPath if (path.endsWith("xml")) { val configurator = new JoranConfigurator() diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 6adc03cf7..f839fa207 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -280,7 +280,7 @@ object PropertiesUtils extends Logger { val map = mutable.Map[String, String]() val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") simple.split("\\s?-D") match { - case d if Utils.notEmpty(d) => + case d if AssertUtils.isNotEmpty(d) => d.foreach( x => { if (x.nonEmpty) { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index 2015a2c80..80a19c2af 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -18,11 +18,11 @@ package org.apache.streampark.common.util import org.apache.commons.lang3.StringUtils -import java.io.{BufferedInputStream, File, FileInputStream, IOException, PrintWriter, StringWriter} +import java.io._ import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort} import java.net.URL import java.time.Duration -import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} +import java.util.{jar, Properties, UUID} import java.util.concurrent.locks.LockSupport import java.util.jar.{JarFile, JarInputStream} @@ -34,43 +34,6 @@ object Utils extends Logger { private[this] lazy val OS = System.getProperty("os.name").toLowerCase - def notNull(obj: Any, message: String): Unit = { - if (obj == null) { - throw new NullPointerException(message) - } - } - - def notNull(obj: Any): Unit = { - notNull(obj, "this argument must not be null") - } - - def notEmpty(elem: Any): Boolean = { - elem match { - case null => false - case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty - case x if x.isInstanceOf[CharSequence] => elem.toString.trim.nonEmpty - case x if x.isInstanceOf[Traversable[_]] => x.asInstanceOf[Traversable[_]].nonEmpty - case x if x.isInstanceOf[Iterable[_]] => x.asInstanceOf[Iterable[_]].nonEmpty - case x if x.isInstanceOf[JavaCollection[_]] => !x.asInstanceOf[JavaCollection[_]].isEmpty - case x if x.isInstanceOf[JavaMap[_, _]] => !x.asInstanceOf[JavaMap[_, _]].isEmpty - case _ => true - } - } - - def isEmpty(elem: Any): Boolean = !notEmpty(elem) - - def required(expression: Boolean): Unit = { - if (!expression) { - throw new IllegalArgumentException - } - } - - def required(expression: Boolean, errorMessage: Any): Unit = { - if (!expression) { - throw new IllegalArgumentException(s"requirement failed: ${errorMessage.toString}") - } - } - def uuid(): String = UUID.randomUUID().toString.replaceAll("-", "") @throws[IOException] diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java index 524959322..47096a0ff 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.base.util; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.cglib.beans.BeanMap; @@ -451,7 +451,7 @@ public final class CommonUtils implements Serializable { } public static <T> T[] arrayRemoveElements(T[] array, T... elem) { - Utils.notNull(array); + AssertUtils.notNull(array); List<T> arrayList = new ArrayList<>(0); Collections.addAll(arrayList, array); if (isEmpty(elem)) { @@ -464,7 +464,7 @@ public final class CommonUtils implements Serializable { } public static <T> T[] arrayRemoveIndex(T[] array, int... index) { - Utils.notNull(array); + AssertUtils.notNull(array); for (int j : index) { if (j < 0 || j > array.length - 1) { throw new IndexOutOfBoundsException("index error.@" + j); @@ -481,7 +481,7 @@ public final class CommonUtils implements Serializable { } public static <T> T[] arrayInsertIndex(T[] array, int index, T t) { - Utils.notNull(array); + AssertUtils.notNull(array); List<T> arrayList = new ArrayList<T>(array.length + 1); if (index == 0) { arrayList.add(t); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java index 66cdb0bee..3818ec45f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.controller; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.ExternalLink; import org.apache.streampark.console.core.service.ExternalLinkService; @@ -70,7 +70,7 @@ public class ExternalLinkController { @PostMapping("/update") @RequiresPermissions("externalLink:update") public RestResponse update(@Valid ExternalLink externalLink) { - Utils.notNull(externalLink.getId(), "The link id cannot be null"); + AssertUtils.notNull(externalLink.getId(), "The link id cannot be null"); externalLinkService.update(externalLink); return RestResponse.success(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java index dc0f23c63..1694d9bdb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java @@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.util.CommonUtils; import org.apache.streampark.console.base.util.WebUtils; @@ -228,7 +229,7 @@ public class Project implements Serializable { try { Process process = Runtime.getRuntime().exec(mvn + " --version"); process.waitFor(); - Utils.required(process.exitValue() == 0); + AssertUtils.required(process.exitValue() == 0); useWrapper = false; } catch (Exception ignored) { log.warn("try using user-installed maven failed, now use maven-wrapper."); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index d6359340b..aabde27cf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -24,8 +24,8 @@ import org.apache.streampark.common.conf.InternalOption; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.SystemPropertyUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.MavenConfig; import org.apache.streampark.console.core.entity.FlinkEnv; @@ -104,7 +104,7 @@ public class EnvInitializer implements ApplicationRunner { .forEach( key -> { InternalOption config = InternalConfigHolder.getConfig(key); - Utils.notNull(config); + AssertUtils.notNull(config); InternalConfigHolder.set(config, springEnv.getProperty(key, config.classType())); }); @@ -172,7 +172,7 @@ public class EnvInitializer implements ApplicationRunner { // 2. upload jar. // 2.1) upload client jar File client = WebUtils.getAppClientDir(); - Utils.required( + AssertUtils.required( client.exists() && client.listFiles().length > 0, client.getAbsolutePath().concat(" is not exists or empty directory ")); @@ -189,7 +189,7 @@ public class EnvInitializer implements ApplicationRunner { WebUtils.getAppLibDir() .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern())); - Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist"); + AssertUtils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist"); String appShims = workspace.APP_SHIMS(); fsOperator.delete(appShims); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java index b08b0d2cf..acb9b255b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java @@ -17,8 +17,8 @@ package org.apache.streampark.console.core.service.alert.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ThreadUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.AlertException; import org.apache.streampark.console.base.util.SpringContextUtils; import org.apache.streampark.console.core.bean.AlertConfigWithParams; @@ -100,7 +100,7 @@ public class AlertServiceImpl implements AlertService { try { Class<? extends AlertNotifyService> notifyServiceClass = getAlertServiceImpl(alertType); - Utils.notNull(notifyServiceClass); + AssertUtils.notNull(notifyServiceClass); boolean alertRes = SpringContextUtils.getBean(notifyServiceClass) .doAlert(params, alertTemplate); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 1d4edd7cd..ad9b350fe 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; @@ -169,7 +170,7 @@ public class AppBuildPipeServiceImpl FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false); if (app.isFlinkSqlJob()) { FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql; - Utils.notNull(flinkSql); + AssertUtils.notNull(flinkSql); app.setDependency(flinkSql.getDependency()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java index 8f3b8d05b..a4e1a91b9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java @@ -17,8 +17,8 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -107,7 +107,7 @@ public class ApplicationConfigServiceImpl if (application.isFlinkSqlJob()) { // get effect config ApplicationConfig effectiveConfig = getEffective(application.getId()); - if (Utils.isEmpty(application.getConfig())) { + if (AssertUtils.isEmpty(application.getConfig())) { if (effectiveConfig != null) { effectiveService.delete(application.getId(), EffectiveType.CONFIG); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 6ed4df915..3d6823174 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.HdfsOperator; import org.apache.streampark.common.fs.LfsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; @@ -1510,7 +1511,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli @Transactional(rollbackFor = {Exception.class}) public void start(Application appParam, boolean auto) throws Exception { final Application application = getById(appParam.getId()); - Utils.notNull(application); + AssertUtils.notNull(application); if (!application.isCanBeStart()) { throw new ApiAlertException("[StreamPark] The application cannot be started repeatedly."); } @@ -1605,7 +1606,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli } } else if (application.isFlinkSqlJob()) { FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); - Utils.notNull(flinkSql); + AssertUtils.notNull(flinkSql); // 1) dist_userJar String sqlDistJar = serviceHelper.getSqlClientJar(flinkEnv); // 2) appConfig @@ -1632,7 +1633,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli } AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); - Utils.notNull(buildPipeline); + AssertUtils.notNull(buildPipeline); BuildResult buildResult = buildPipeline.getBuildResult(); if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java index b01aed743..ecfa29131 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ExternalLink; import org.apache.streampark.console.core.enums.PlaceholderType; @@ -75,7 +75,7 @@ public class ExternalLinkServiceImpl extends ServiceImpl<ExternalLinkMapper, Ext @Override public List<ExternalLink> render(Long appId) { Application app = applicationService.getById(appId); - Utils.notNull(app, "Application doesn't exist"); + AssertUtils.notNull(app, "Application doesn't exist"); List<ExternalLink> externalLink = this.list(); if (externalLink != null && !externalLink.isEmpty()) { // Render the placeholder @@ -109,10 +109,10 @@ public class ExternalLinkServiceImpl extends ServiceImpl<ExternalLinkMapper, Ext if (result == null) { return true; } - Utils.required( + AssertUtils.required( !result.getBadgeName().equals(params.getBadgeName()), String.format("The name: %s is already existing.", result.getBadgeName())); - Utils.required( + AssertUtils.required( !result.getLinkUrl().equals(params.getLinkUrl()), String.format("The linkUrl: %s is already existing.", result.getLinkUrl())); return false; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index f03af2ae0..1633926aa 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; @@ -172,11 +173,11 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) public void rollback(Application application) { FlinkSql sql = getCandidate(application.getId(), CandidateType.HISTORY); - Utils.notNull(sql); + AssertUtils.notNull(sql); try { // check and backup current job FlinkSql effectiveSql = getEffective(application.getId(), false); - Utils.notNull(effectiveSql); + AssertUtils.notNull(effectiveSql); // rollback history sql backUpService.rollbackFlinkSql(application, sql); } catch (Exception e) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index 4101163eb..ba72d4331 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -20,9 +20,9 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.ThreadUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; @@ -122,7 +122,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Transactional(rollbackFor = {Exception.class}) public boolean update(Project projectParam) { Project project = getById(projectParam.getId()); - Utils.notNull(project); + AssertUtils.notNull(project); ApiAlertException.throwIfFalse( project.getTeamId().equals(projectParam.getTeamId()), "Team can't be changed, update project failed."); @@ -166,7 +166,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Transactional(rollbackFor = {Exception.class}) public boolean delete(Long id) { Project project = getById(id); - Utils.notNull(project); + AssertUtils.notNull(project); LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>().eq(Application::getProjectId, id); long count = applicationService.count(queryWrapper); @@ -246,7 +246,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Override public List<String> modules(Long id) { Project project = getById(id); - Utils.notNull(project); + AssertUtils.notNull(project); BuildState buildState = BuildState.of(project.getBuildState()); if (BuildState.SUCCESSFUL.equals(buildState)) { File appHome = project.getDistHome(); @@ -373,7 +373,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> } List<Map<String, Object>> list = new ArrayList<>(); File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName())); - Utils.notNull(files); + AssertUtils.notNull(files); for (File item : files) { eachFile(item, list, true); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java index 9dae7dc84..449676fdb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ExecutionMode; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.common.util.ThreadUtils; @@ -123,8 +124,8 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint private void clearExpire(Savepoint entity) { FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId()); Application application = applicationService.getById(entity.getAppId()); - Utils.notNull(flinkEnv); - Utils.notNull(application); + AssertUtils.notNull(flinkEnv); + AssertUtils.notNull(application); String numRetainedKey = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(); String numRetainedFromDynamicProp = @@ -277,7 +278,7 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint // 3.1) At the remote mode, request the flink webui interface to get the savepoint path if (ExecutionMode.isRemoteMode(application.getExecutionMode())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); - Utils.notNull( + AssertUtils.notNull( cluster, String.format( "The clusterId=%s cannot be find, maybe the clusterId is wrong or " @@ -440,7 +441,7 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint Map<String, Object> properties = new HashMap<>(); if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) { - Utils.notNull( + AssertUtils.notNull( cluster, String.format( "The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", @@ -461,7 +462,7 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint return application.getJobName(); case KUBERNETES_NATIVE_SESSION: case YARN_SESSION: - Utils.notNull( + AssertUtils.notNull( cluster, String.format( "The %s clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java index f01b5df58..e9997af41 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.service.SqlCompleteService; import com.google.common.collect.Sets; @@ -185,7 +185,7 @@ public class SqlCompleteServiceImpl implements SqlCompleteService { nowStep = nowStep.get(nowChar).getNext(); loc += 1; } - Utils.notNull(preNode); + AssertUtils.notNull(preNode); preNode.setStop(); preNode.setCount(count); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java index 22d4e89ad..d8ea8714e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.ExecutionMode; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -74,8 +74,9 @@ public class YarnQueueServiceImpl extends ServiceImpl<YarnQueueMapper, YarnQueue @Override public IPage<YarnQueue> page(YarnQueue yarnQueue, RestRequest request) { - Utils.notNull(yarnQueue, "Yarn queue query params mustn't be null."); - Utils.notNull(yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue query params mustn't be null."); + AssertUtils.notNull( + yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null."); Page<YarnQueue> page = MybatisPager.getPage(request); return this.baseMapper.findQueues(page, yarnQueue); } @@ -88,8 +89,8 @@ public class YarnQueueServiceImpl extends ServiceImpl<YarnQueueMapper, YarnQueue @Override public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) { - Utils.notNull(yarnQueue, "Yarn queue mustn't be empty."); - Utils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be empty."); + AssertUtils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null."); ResponseResult<String> responseResult = new ResponseResult<>(); @@ -211,8 +212,8 @@ public class YarnQueueServiceImpl extends ServiceImpl<YarnQueueMapper, YarnQueue @VisibleForTesting public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) { - Utils.notNull(yarnQueue, "Yarn queue mustn't be null."); - Utils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be null."); + AssertUtils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null."); YarnQueue queueFromDB = getById(yarnQueue.getId()); ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist."); return queueFromDB; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java index 3e95f7e5e..ae7d19532 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -173,8 +173,10 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, Member> impleme () -> new ApiAlertException( String.format("The member [id=%s] not found", member.getId()))); - Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed."); - Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed."); + AssertUtils.required( + oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed."); + AssertUtils.required( + oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed."); Optional.ofNullable(roleService.getById(member.getRoleId())) .orElseThrow( () -> diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java index 584f157a1..69d6c74f3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -79,7 +79,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us public IPage<User> page(User user, RestRequest request) { Page<User> page = MybatisPager.getPage(request); IPage<User> resPage = this.baseMapper.findUserDetail(page, user); - Utils.notNull(resPage); + AssertUtils.notNull(resPage); if (resPage.getTotal() == 0) { resPage.setRecords(Collections.emptyList()); } @@ -196,7 +196,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us @Override public void setLastTeam(Long teamId, Long userId) { User user = getById(userId); - Utils.notNull(user); + AssertUtils.notNull(user); user.setLastTeamId(teamId); this.baseMapper.updateById(user); } @@ -204,7 +204,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us @Override public void clearLastTeam(Long userId, Long teamId) { User user = getById(userId); - Utils.notNull(user); + AssertUtils.notNull(user); if (!teamId.equals(user.getLastTeamId())) { return; } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java index 4c6144a36..b146d281b 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.hbase.source; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.flink.connector.function.RunningFunction; import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction; import org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction; @@ -42,8 +42,8 @@ public class HBaseJavaSource<T> { HBaseResultFunction<T> resultFunction, RunningFunction runningFunc) { - Utils.notNull(queryFunction, "queryFunction must not be null"); - Utils.notNull(resultFunction, "resultFunction must not be null"); + AssertUtils.notNull(queryFunction, "queryFunction must not be null"); + AssertUtils.notNull(resultFunction, "resultFunction must not be null"); HBaseSourceFunction<T> sourceFunction = new HBaseSourceFunction<>(property, queryFunction, resultFunction, runningFunc, null); return context.getJavaEnv().addSource(sourceFunction); diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java index 8f9634978..636ddf5d3 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java @@ -17,8 +17,8 @@ package org.apache.streampark.flink.connector.jdbc.sink; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ConfigUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.flink.connector.function.TransformFunction; import org.apache.streampark.flink.connector.jdbc.internal.JdbcSinkFunction; import org.apache.streampark.flink.core.scala.StreamingContext; @@ -55,7 +55,7 @@ public class JdbcJavaSink<T> { } public DataStreamSink<T> sink(DataStream<T> dataStream) { - Utils.notNull(sqlFunc, "transformFunction can not be null"); + AssertUtils.notNull(sqlFunc, "transformFunction can not be null"); if (this.jdbc == null) { this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias); } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java index e3f36d34c..083f6d44d 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java @@ -17,8 +17,8 @@ package org.apache.streampark.flink.connector.jdbc.source; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ConfigUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.flink.connector.function.RunningFunction; import org.apache.streampark.flink.connector.function.SQLQueryFunction; import org.apache.streampark.flink.connector.function.SQLResultFunction; @@ -54,8 +54,8 @@ public class JdbcJavaSource<T> { SQLResultFunction<T> resultFunction, RunningFunction runningFunc) { - Utils.notNull(queryFunction, "queryFunction must not be null"); - Utils.notNull(resultFunction, "resultFunction must not be null"); + AssertUtils.notNull(queryFunction, "queryFunction must not be null"); + AssertUtils.notNull(resultFunction, "resultFunction must not be null"); if (this.jdbc == null) { this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias); } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala index 0e2cab882..6186711ae 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.jdbc.source -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.ConfigUtils import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction import org.apache.streampark.flink.core.scala.StreamingContext @@ -31,14 +31,15 @@ import scala.collection.Map object JdbcSource { - def apply(@(transient @param) property: Properties = new Properties())(implicit - ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, property) + def apply(alias: String = "", properties: Properties = new Properties())(implicit + ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, alias, properties) {} } class JdbcSource( @(transient @param) val ctx: StreamingContext, - property: Properties = new Properties()) { + alias: String, + property: Properties) { /** * @param sqlFun @@ -50,8 +51,11 @@ class JdbcSource( def getDataStream[R: TypeInformation]( sqlFun: R => String, fun: Iterable[Map[String, _]] => Iterable[R], - running: Unit => Boolean)(implicit jdbc: Properties = new Properties()): DataStream[R] = { - Utils.copyProperties(property, jdbc) + running: Unit => Boolean): DataStream[R] = { + val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias) + if (property != null) { + jdbc.putAll(property) + } val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, fun, running) ctx.addSource(mysqlFun) } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java index dc49f621d..560de9eb0 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.mongo.source; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.flink.connector.function.RunningFunction; import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction; import org.apache.streampark.flink.connector.mongo.function.MongoResultFunction; @@ -43,9 +43,9 @@ public class MongoJavaSource<T> { MongoResultFunction<T> resultFunction, RunningFunction runningFunc) { - Utils.notNull(collectionName, "collectionName must not be null"); - Utils.notNull(queryFunction, "queryFunction must not be null"); - Utils.notNull(resultFunction, "resultFunction must not be null"); + AssertUtils.notNull(collectionName, "collectionName must not be null"); + AssertUtils.notNull(queryFunction, "queryFunction must not be null"); + AssertUtils.notNull(resultFunction, "resultFunction must not be null"); MongoSourceFunction<T> sourceFunction = new MongoSourceFunction<>( collectionName, property, queryFunction, resultFunction, runningFunc, null); diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala index 795db7d09..61663845a 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.packer.docker import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder} -import org.apache.streampark.common.util.Utils +import org.apache.streampark.common.util.AssertUtils import com.github.dockerjava.api.DockerClient import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientConfig, HackDockerClient} @@ -61,7 +61,7 @@ object DockerRetriever { /** set docker-host for kata */ def setDockerHost(): Unit = { val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST) - if (Utils.notEmpty(dockerhost)) { + if (AssertUtils.isNotEmpty(dockerhost)) { val dockerHostUri: URI = new URI(dockerhost) dockerHttpClientBuilder.dockerHost(dockerHostUri) }
