This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch assert_util in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 017addd5e201270adae14713f5e576ffc559bde0 Author: benjobs <[email protected]> AuthorDate: Sat Mar 23 21:30:08 2024 +0800 [Improve] AssertUtils add and check not null improvements --- .../streampark/common/util/ExceptionUtils.java | 65 ----- .../apache/streampark/common/fs/LfsOperator.scala | 4 +- .../streampark/common/util/AssertUtils.scala | 282 +++++++++++++++++++++ .../streampark/common/util/ExceptionUtils.scala | 65 +++++ .../apache/streampark/common/util/FileUtils.scala | 7 +- .../org/apache/streampark/common/util/Logger.scala | 2 +- .../streampark/common/util/PropertiesUtils.scala | 2 +- .../org/apache/streampark/common/util/Utils.scala | 26 +- .../apache/streampark/common/util/UtilsTest.scala | 20 +- .../streampark/console/core/bean/Dependency.java | 2 +- .../core/component/FlinkCheckpointProcessor.java | 8 +- .../core/controller/ExternalLinkController.java | 4 +- .../streampark/console/core/entity/Project.java | 73 +++--- .../console/core/runner/EnvInitializer.java | 8 +- .../impl/ApplicationActionServiceImpl.java | 8 +- .../core/service/impl/AppBuildPipeServiceImpl.java | 4 +- .../core/service/impl/ExternalLinkServiceImpl.java | 8 +- .../core/service/impl/FlinkSqlServiceImpl.java | 6 +- .../core/service/impl/ProjectServiceImpl.java | 10 +- .../core/service/impl/SavePointServiceImpl.java | 12 +- .../core/service/impl/SqlCompleteServiceImpl.java | 6 +- .../core/service/impl/YarnQueueServiceImpl.java | 14 +- .../system/service/impl/MemberServiceImpl.java | 8 +- .../system/service/impl/UserServiceImpl.java | 8 +- .../console/SpringIntegrationTestBase.java | 11 +- .../flink/packer/docker/DockerRetriever.scala | 2 +- .../flink/FlinkStandaloneSessionCluster.java | 14 +- 27 files changed, 476 insertions(+), 203 deletions(-) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java deleted file mode 100644 index 22d93e52b..000000000 --- a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.common.util; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; - -/** Utils to process exception message. */ -public class ExceptionUtils { - - private ExceptionUtils() {} - - /** - * Stringify the exception object. - * - * @param throwable the target exception to stringify. - * @return the result of string-exception. - */ - @Nonnull - public static String stringifyException(@Nullable Throwable throwable) { - if (throwable == null) { - return "(null)"; - } - try (StringWriter stm = new StringWriter(); - PrintWriter writer = new PrintWriter(stm)) { - throwable.printStackTrace(writer); - return stm.toString(); - } catch (IOException e) { - return e.getClass().getName() + " (error while printing stack trace)"; - } - } - - @FunctionalInterface - public interface WrapperRuntimeExceptionHandler<I, O> { - O handle(I input) throws Exception; - } - - public static <I, O> O wrapRuntimeException( - I input, WrapperRuntimeExceptionHandler<I, O> handler) { - try { - return handler.handle(input); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala index 8e4263591..1fce4302a 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala @@ -18,7 +18,7 @@ package org.apache.streampark.common.fs import org.apache.streampark.common.util.Logger -import org.apache.streampark.common.util.Utils.{isAnyBank, requireNotEmpty} +import org.apache.streampark.common.util.Utils.{isAnyBank, isNotEmpty} import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.io.{FileUtils, IOUtils} @@ -41,7 +41,7 @@ object LfsOperator extends FsOperator with Logger { } override def delete(path: String): Unit = { - if (requireNotEmpty(path)) { + if (isNotEmpty(path)) { val file = new File(path) if (file.exists()) { FileUtils.forceDelete(file) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala new file mode 100644 index 000000000..48259cd96 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.common.util + +import org.apache.streampark.common.util.Utils.isEmpty + +import javax.annotation.Nullable + +import java.util + +import scala.collection.convert.ImplicitConversions._ + +/** @since 2.2.0 */ +object AssertUtils { + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the + * condition is not met (evaluates to {@code false}). + * + * @param condition + * The condition to check + * @throws IllegalArgumentException + * Thrown, if the condition is violated. + */ + def required(condition: Boolean): Unit = { + if (!condition) { + throw new IllegalArgumentException + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the + * condition is not met (evaluates to {@code false}). The exception will have the given error + * message. + * + * @param condition + * The condition to check + * @param message + * The message for the {@code IllegalArgumentException} that is thrown if the check fails. + * @throws IllegalArgumentException + * Thrown, if the condition is violated. + */ + def required(condition: Boolean, @Nullable message: String): Unit = { + if (!condition) { + throw new IllegalArgumentException(message) + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalStateException} if the + * condition is not met (evaluates to {@code false}). + * + * @param condition + * The condition to check + * @throws IllegalStateException + * Thrown, if the condition is violated. + */ + def state(condition: Boolean): Unit = { + if (!condition) { + throw new IllegalStateException + } + } + + /** + * Checks the given boolean condition, and throws an IllegalStateException if the condition is not + * met (evaluates to {@code false}). The exception will have the given error message. + * + * @param condition + * The condition to check + * @param message + * The message for the IllegalStateException that is thrown if the check fails. + * @throws IllegalStateException + * Thrown, if the condition is violated. + */ + def state(condition: Boolean, @Nullable message: String): Unit = { + if (!condition) { + throw new IllegalStateException(message) + } + } + + // ------------------------------------------------------------------------ + // Null checks + // ------------------------------------------------------------------------ + /** Ensures that the given object reference is not null. Upon violation, a */ + def notNull[T](@Nullable reference: T): T = { + if (reference == null) { + throw new NullPointerException + } + reference + } + + /** + * Ensures that the given object reference is not null. Upon violation, a NullPointerException + * that is thrown if the check fails. + * + * @return + * The object reference itself (generically typed). + * @throws NullPointerException + * Thrown, if the passed reference was null. + */ + def notNull[T](@Nullable reference: T, @Nullable message: String): T = { + if (reference == null) { + throw new NullPointerException(message) + } + reference + } + + /** + * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and + * must contain at least one element. <pre class="code">AssertUtils.notEmpty(array, "must be + * contain elements");</pre> + * + * @param reference + * the object to check + * @throws IllegalArgumentException + * if the object array is {@code null} or contains no elements + */ + def notEmpty(reference: AnyRef): Unit = { + if (Utils.isEmpty(reference)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and + * must contain at least one element. <pre class="code"> AssertUtils.notEmpty(array, "must be + * contain elements");</pre> + * + * @param reference + * the object to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the object array is {@code null} or contains no elements + */ + def notEmpty(@Nullable reference: AnyRef, message: String): Unit = { + if (isEmpty(reference)) { + throw new IllegalArgumentException(message) + } + } + + /** + * Assert that an array contains no {@code null} elements. <p>Note: Does not complain if the array + * is empty! <pre class="code">AssertUtils.noNullElements(array, "The array must contain non-null + * elements");</pre> + * + * @param array + * the array to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the object array contains a {@code null} element + */ + def noNullElements(@Nullable array: Array[AnyRef], message: String): Unit = { + if (array != null) for (element <- array) { + if (element == null) throw new IllegalArgumentException(message) + } + } + + /** + * Assert that a collection contains no {@code null} elements. <p>Note: Does not complain if the + * collection is empty! <pre class="code">AssertUtils.noNullElements(collection, "Collection must + * contain non-null elements");</pre> + * + * @param collection + * the collection to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the collection contains a {@code null} element + */ + def noNullElements(@Nullable collection: util.Collection[_], message: String): Unit = { + if (collection != null) { + for (element <- collection) { + if (element == null) { + throw new IllegalArgumentException(message) + } + } + } + } + + /** + * Assert that the given String is not empty; that is, it must not be {@code null} and not the + * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre> + * + * @param text + * the String to check + * @throws IllegalArgumentException + * if the text is empty + * @see + * StringUtils#hasLength + */ + def hasLength(@Nullable text: String): Unit = { + if (!getHasLength(text)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that the given String is not empty; that is, it must not be {@code null} and not the + * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre> + * + * @param text + * the String to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the text is empty + * @see + * StringUtils#hasLength + */ + def hasLength(@Nullable text: String, message: String): Unit = { + if (!getHasLength(text)) throw new IllegalArgumentException(message) + } + + /** + * Assert that the given String contains valid text content; that is, it must not be {@code null} + * and must contain at least one non-whitespace character. <pre + * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre> + * + * @param text + * the String to check + * @throws IllegalArgumentException + * if the text does not contain valid text content + * @see + * StringUtils#hasText + */ + def hasText(@Nullable text: String): Unit = { + if (!getHasText(text)) { + throw new IllegalArgumentException() + } + } + + /** + * Assert that the given String contains valid text content; that is, it must not be {@code null} + * and must contain at least one non-whitespace character. <pre + * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre> + * + * @param text + * the String to check + * @param message + * the exception message to use if the assertion fails + * @throws IllegalArgumentException + * if the text does not contain valid text content + * @see + * StringUtils#hasText + */ + def hasText(@Nullable text: String, message: String): Unit = { + if (!getHasText(text)) throw new IllegalArgumentException(message) + } + + private[this] def getHasLength(@Nullable str: String): Boolean = str != null && !str.isEmpty + + private[this] def getHasText(@Nullable str: String): Boolean = { + str != null && str.nonEmpty && containsText(str) + } + + private[this] def containsText(str: CharSequence): Boolean = { + val strLen = str.length + for (i <- 0 until strLen) { + if (!Character.isWhitespace(str.charAt(i))) { + return true + } + } + false + } + +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala new file mode 100644 index 000000000..a382272a0 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import javax.annotation.Nonnull +import javax.annotation.Nullable + +import java.io.PrintWriter +import java.io.StringWriter + +object ExceptionUtils { + + /** + * Stringify the exception object. + * + * @param throwable + * the target exception to stringify. + * @return + * the result of string-exception. + */ + @Nonnull def stringifyException(@Nullable throwable: Throwable): String = { + if (throwable == null) { + return "(null)" + } + val stm = new StringWriter() + val writer = new PrintWriter(stm) + try { + throwable.printStackTrace(writer); + stm.toString + } catch { + case e: Exception => s"${e.getClass.getName} (error while printing stack trace)"; + case _ => null + } finally { + Utils.close(writer, stm) + } + } + + @FunctionalInterface trait WrapperRuntimeExceptionHandler[I, O] { + @throws[Exception] + def handle(input: I): O + } + + def wrapRuntimeException[I, O](input: I, handler: WrapperRuntimeExceptionHandler[I, O]): O = { + try handler.handle(input) + catch { + case e: Exception => + throw new RuntimeException(e) + } + } +} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 57cf5e55a..e8fe7ef51 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -92,9 +92,10 @@ object FileUtils { def getPathFromEnv(env: String): String = { val path = Option(System.getenv(env)).getOrElse(System.getProperty(env)) - require( - Utils.requireNotEmpty(path), - s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env") + AssertUtils.notNull( + path, + s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env" + ) val file = new File(path) require(file.exists(), s"[StreamPark] FileUtils.getPathFromEnv: $env is not exist!") file.getAbsolutePath diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala index faf1fc97c..ac097b191 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala @@ -134,7 +134,7 @@ private[this] object LoggerFactory extends LoggerFactoryBinder { val shadedPackage = "org.apache.streampark.shaded" override def configureByResource(url: URL): Unit = { - Utils.requireNotNull(url, "URL argument cannot be null") + AssertUtils.notNull(url, "URL argument cannot be null") val path = url.getPath if (path.endsWith("xml")) { val configurator = new JoranConfigurator() diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 541f57f1b..e3a0ae547 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -281,7 +281,7 @@ object PropertiesUtils extends Logger { val map = mutable.Map[String, String]() val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") simple.split("\\s?-D") match { - case d if Utils.requireNotEmpty(d) => + case d if Utils.isNotEmpty(d) => d.foreach( x => { if (x.nonEmpty) { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index fcba212d7..ed27537a8 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -33,17 +33,7 @@ object Utils extends Logger { private[this] lazy val OS = System.getProperty("os.name").toLowerCase - def requireNotNull(obj: Any, message: String): Unit = { - if (obj == null) { - throw new NullPointerException(message) - } - } - - def requireNotNull(obj: Any): Unit = { - requireNotNull(obj, "this argument must not be null") - } - - def requireNotEmpty(elem: Any): Boolean = { + def isNotEmpty(elem: Any): Boolean = { elem match { case null => false case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty @@ -56,19 +46,7 @@ object Utils extends Logger { } } - def isEmpty(elem: Any): Boolean = !requireNotEmpty(elem) - - def required(expression: Boolean): Unit = { - if (!expression) { - throw new IllegalArgumentException - } - } - - def required(expression: Boolean, errorMessage: Any): Unit = { - if (!expression) { - throw new IllegalArgumentException(s"Requirement failed: ${errorMessage.toString}") - } - } + def isEmpty(elem: Any): Boolean = !isNotEmpty(elem) def uuid(): String = UUID.randomUUID().toString.replaceAll("-", "") diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala index afc5f04e3..5f7a5c81e 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala @@ -30,31 +30,31 @@ class UtilsTest extends AnyFunSuite { test("requiredNotNull should throw NullPointerException if argument is null") { val nullPointerException = intercept[NullPointerException] { - Utils.requireNotNull(null, "object can't be null") + AssertUtils.notNull(null, "object can't be null") } assert(nullPointerException.getMessage == "object can't be null") } test("requireNotEmpty should check if argument is not empty") { - assert(!Utils.requireNotEmpty(null)) - assert(Utils.requireNotEmpty(Array(1))) - assert(Utils.requireNotEmpty("string")) - assert(Utils.requireNotEmpty(Seq("Seq"))) - assert(Utils.requireNotEmpty(Iterable("Iterable"))) + assert(!Utils.isNotEmpty(null)) + assert(Utils.isNotEmpty(Array(1))) + assert(Utils.isNotEmpty("string")) + assert(Utils.isNotEmpty(Seq("Seq"))) + assert(Utils.isNotEmpty(Iterable("Iterable"))) val arrayList = new util.ArrayList[String](16) arrayList.add("arrayList") - assert(Utils.requireNotEmpty(arrayList)) + assert(Utils.isNotEmpty(arrayList)) val hashMap = new util.HashMap[String, String](16) hashMap.put("hash", "map") - assert(Utils.requireNotEmpty(hashMap)) - assert(Utils.requireNotEmpty()) + assert(Utils.isNotEmpty(hashMap)) + assert(Utils.isNotEmpty()) } test("required should throw IllegalArgumentException if condition is false") { val illegalArgumentException = intercept[IllegalArgumentException] { - Utils.required(false) + AssertUtils.required(false) } assert(illegalArgumentException.getMessage == null) } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java index d9eb147c9..e93737d20 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java @@ -43,7 +43,7 @@ public class Dependency { @SneakyThrows public static Dependency toDependency(String dependency) { - if (Utils.requireNotEmpty(dependency)) { + if (Utils.isNotEmpty(dependency)) { return JacksonUtils.read(dependency, new TypeReference<Dependency>() {}); } return new Dependency(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java index f3e713e5c..ab45f2d13 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.component; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.bean.AlertTemplate; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.SavePoint; @@ -28,8 +29,6 @@ import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; -import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions; - import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import lombok.Data; @@ -120,10 +119,9 @@ public class FlinkCheckpointProcessor { checkPointFailedCache.remove(appId); FailoverStrategyEnum failoverStrategyEnum = FailoverStrategyEnum.of(application.getCpFailureAction()); - Preconditions.checkArgument( + AssertUtils.required( failoverStrategyEnum != null, - "Unexpected cpFailureAction: %s", - application.getCpFailureAction()); + "Unexpected cpFailureAction: " + application.getCpFailureAction()); processFailoverStrategy(application, failoverStrategyEnum); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java index 5af4fc068..22076c597 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.controller; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.ExternalLink; import org.apache.streampark.console.core.service.ExternalLinkService; @@ -82,7 +82,7 @@ public class ExternalLinkController { @PostMapping("/update") @RequiresPermissions("externalLink:update") public RestResponse update(@Valid ExternalLink externalLink) { - Utils.requireNotNull(externalLink.getId(), "The link id cannot be null"); + AssertUtils.notNull(externalLink.getId(), "The link id cannot be null"); externalLinkService.update(externalLink); return RestResponse.success(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java index 1005cb340..9f5bd6c80 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java @@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.streampark.console.base.util.GitUtils; @@ -28,7 +29,6 @@ import org.apache.streampark.console.core.enums.GitAuthorizedErrorEnum; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions; import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.IdType; @@ -40,8 +40,6 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.eclipse.jgit.lib.Constants; -import javax.annotation.Nonnull; - import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -188,6 +186,33 @@ public class Project implements Serializable { @JsonIgnore public String getMavenArgs() { + // 1) check build args + String buildArg = getMvnBuildArgs(); + StringBuilder argBuilder = new StringBuilder(); + if (StringUtils.isNotBlank(buildArg)) { + argBuilder.append(buildArg); + } + + // 2) mvn setting file + String mvnSetting = getMvnSetting(); + if (StringUtils.isNotBlank(mvnSetting)) { + argBuilder.append(" --settings ").append(mvnSetting); + } + + // 3) check args + String cmd = argBuilder.toString(); + String illegalArg = getIllegalArgs(cmd); + if (illegalArg != null) { + throw new IllegalArgumentException( + String.format( + "Invalid maven argument, illegal args: %s, in your maven args: %s", illegalArg, cmd)); + } + + String mvn = getMvn(); + return mvn.concat(" ").concat(cmd); + } + + private String getMvn() { boolean windows = Utils.isWindows(); String mvn = windows ? "mvn.cmd" : "mvn"; @@ -200,7 +225,7 @@ public class Project implements Serializable { try { Process process = Runtime.getRuntime().exec(mvn + " --version"); process.waitFor(); - Utils.required(process.exitValue() == 0); + AssertUtils.required(process.exitValue() == 0); useWrapper = false; } catch (Exception ignored) { log.warn("try using user-installed maven failed, now use maven-wrapper."); @@ -210,44 +235,32 @@ public class Project implements Serializable { if (useWrapper) { mvn = WebUtils.getAppHome().concat(windows ? "/bin/mvnw.cmd" : "/bin/mvnw"); } - - return renderCmd(mvn); + return mvn; } - @Nonnull - private String renderCmd(String mvn) { - StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests "); - renderCmdByBuildArgs(cmdBuffer); - renderCmdBySetting(cmdBuffer); - return cmdBuffer.toString(); - } - - private void renderCmdByBuildArgs(StringBuilder cmdBuffer) { + private String getMvnBuildArgs() { if (StringUtils.isNotBlank(this.buildArgs)) { String args = getIllegalArgs(this.buildArgs); - Preconditions.checkArgument( + AssertUtils.required( args == null, - "Illegal argument: \"%s\" in maven build parameters: %s", - args, - this.buildArgs); - cmdBuffer.append(this.buildArgs.trim()); + String.format( + "Illegal argument: \"%s\" in maven build parameters: %s", args, this.buildArgs)); + return this.buildArgs.trim(); } + return null; } - private void renderCmdBySetting(StringBuilder cmdBuffer) { + private String getMvnSetting() { String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH()); if (StringUtils.isBlank(setting)) { - return; + return null; } - String args = getIllegalArgs(setting); - Preconditions.checkArgument( - args == null, "Illegal argument \"%s\" in maven-setting file path: %s", args, setting); File file = new File(setting); - Preconditions.checkArgument( + AssertUtils.required( !file.exists() || !file.isFile(), - "Invalid maven-setting file path \"%s\", the path not exist or is not file", - setting); - cmdBuffer.append(" --settings ").append(setting); + String.format( + "Invalid maven-setting file path \"%s\", the path not exist or is not file", setting)); + return setting; } private String getIllegalArgs(String param) { @@ -257,7 +270,7 @@ public class Project implements Serializable { return matcher.group(1) == null ? matcher.group(2) : matcher.group(1); } - Iterator<String> iterator = Arrays.asList(";", "|", "&", ">").iterator(); + Iterator<String> iterator = Arrays.asList(";", "|", "&", ">", "<").iterator(); String[] argsList = param.split("\\s+"); while (iterator.hasNext()) { String chr = iterator.next(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 635c2a465..3454dd876 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -24,8 +24,8 @@ import org.apache.streampark.common.conf.InternalOption; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.SystemPropertyUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.zio.ZIOExt; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.entity.FlinkEnv; @@ -101,7 +101,7 @@ public class EnvInitializer implements ApplicationRunner { .forEach( key -> { InternalOption config = InternalConfigHolder.getConfig(key); - Utils.requireNotNull(config); + AssertUtils.notNull(config); InternalConfigHolder.set(config, env.getProperty(key, config.classType())); }); @@ -181,7 +181,7 @@ public class EnvInitializer implements ApplicationRunner { private void uploadClientJar(Workspace workspace, FsOperator fsOperator) { File client = WebUtils.getAppClientDir(); - Utils.required( + AssertUtils.required( client.exists() && client.listFiles().length > 0, client.getAbsolutePath().concat(" is not exists or empty directory ")); @@ -198,7 +198,7 @@ public class EnvInitializer implements ApplicationRunner { File[] shims = WebUtils.getAppLibDir() .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern())); - Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist"); + AssertUtils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist"); String appShims = workspace.APP_SHIMS(); fsOperator.delete(appShims); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 2974faa64..60047f481 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -27,11 +27,11 @@ import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.enums.FlinkRestoreMode; import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApplicationException; import org.apache.streampark.console.core.entity.AppBuildPipeline; @@ -381,7 +381,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, public void start(Application appParam, boolean auto) throws Exception { // 1) check application final Application application = getById(appParam.getId()); - Utils.requireNotNull(application); + AssertUtils.notNull(application); ApiAlertException.throwIfTrue( !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly."); @@ -397,7 +397,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, } AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); - Utils.requireNotNull(buildPipeline); + AssertUtils.notNull(buildPipeline); FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId()); ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version"); @@ -635,7 +635,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, switch (application.getDevelopmentMode()) { case FLINK_SQL: FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); - Utils.requireNotNull(flinkSql); + AssertUtils.notNull(flinkSql); // 1) dist_userJar String sqlDistJar = commonService.getSqlClientJar(flinkEnv); // 2) appConfig diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index ffcafc27e..3370e6652 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -24,9 +24,9 @@ import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.FlinkDevelopmentMode; import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.FileUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.base.util.WebUtils; @@ -194,7 +194,7 @@ public class AppBuildPipeServiceImpl FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false); if (app.isFlinkSqlJobOrPyFlinkJob()) { FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql; - Utils.requireNotNull(flinkSql); + AssertUtils.notNull(flinkSql); app.setDependency(flinkSql.getDependency()); app.setTeamResource(flinkSql.getTeamResource()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java index 1a88494e4..ebf979c3f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ExternalLink; import org.apache.streampark.console.core.enums.PlaceholderTypeEnum; @@ -77,7 +77,7 @@ public class ExternalLinkServiceImpl extends ServiceImpl<ExternalLinkMapper, Ext @Override public List<ExternalLink> render(Long appId) { Application app = applicationManageService.getById(appId); - Utils.requireNotNull(app, "Application doesn't exist"); + AssertUtils.notNull(app, "Application doesn't exist"); List<ExternalLink> externalLink = this.list(); if (externalLink != null && externalLink.size() > 0) { // Render the placeholder @@ -112,10 +112,10 @@ public class ExternalLinkServiceImpl extends ServiceImpl<ExternalLinkMapper, Ext if (result == null) { return true; } - Utils.required( + AssertUtils.required( !result.getBadgeName().equals(params.getBadgeName()), String.format("The name: %s is already existing.", result.getBadgeName())); - Utils.required( + AssertUtils.required( !result.getLinkUrl().equals(params.getLinkUrl()), String.format("The linkUrl: %s is already existing.", result.getLinkUrl())); return false; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index d648a2294..3599ed8d0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -17,9 +17,9 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.ExceptionUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; import org.apache.streampark.console.core.entity.Application; @@ -172,11 +172,11 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) public void rollback(Application application) { FlinkSql sql = getCandidate(application.getId(), CandidateTypeEnum.HISTORY); - Utils.requireNotNull(sql); + AssertUtils.notNull(sql); try { // check and backup current job FlinkSql effectiveSql = getEffective(application.getId(), false); - Utils.requireNotNull(effectiveSql); + AssertUtils.notNull(effectiveSql); // rollback history sql backUpService.rollbackFlinkSql(application, sql); } catch (Exception e) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index cdb1859b6..778b1a03b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -21,9 +21,9 @@ import org.apache.streampark.common.Constant; import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.FileUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; @@ -109,7 +109,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Override public boolean update(Project projectParam) { Project project = getById(projectParam.getId()); - Utils.requireNotNull(project); + AssertUtils.notNull(project); ApiAlertException.throwIfFalse( project.getTeamId().equals(projectParam.getTeamId()), "TeamId can't be changed, update project failed."); @@ -155,7 +155,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Override public boolean removeById(Long id) { Project project = getById(id); - Utils.requireNotNull(project); + AssertUtils.notNull(project); LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>().eq(Application::getProjectId, id); long count = applicationManageService.count(queryWrapper); @@ -227,7 +227,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Override public List<String> listModules(Long id) { Project project = getById(id); - Utils.requireNotNull(project); + AssertUtils.notNull(project); if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState()) || !project.getDistHome().exists()) { @@ -293,7 +293,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> } List<Map<String, Object>> confList = new ArrayList<>(); File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName())); - Utils.requireNotNull(files); + AssertUtils.notNull(files); for (File item : files) { eachFile(item, confList, true); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 8616e5a21..c973be1f4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -18,9 +18,9 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.FlinkExecutionMode; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.ExceptionUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.InternalException; @@ -288,7 +288,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint Map<String, Object> properties = new HashMap<>(); if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) { - Utils.requireNotNull( + AssertUtils.notNull( cluster, String.format( "The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", @@ -306,7 +306,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint } if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) { if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) { - Utils.requireNotNull( + AssertUtils.notNull( cluster, String.format( "The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.", @@ -374,7 +374,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint // At the remote mode, request the flink webui interface to get the savepoint path FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); - Utils.requireNotNull( + AssertUtils.notNull( cluster, String.format( "The clusterId=%s cannot be find, maybe the clusterId is wrong or " @@ -439,8 +439,8 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint private void expire(SavePoint entity) { FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId()); Application application = applicationManageService.getById(entity.getAppId()); - Utils.requireNotNull(flinkEnv); - Utils.requireNotNull(application); + AssertUtils.notNull(flinkEnv); + AssertUtils.notNull(application); int cpThreshold = tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties()) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java index 0b70f9d6d..d2ba899f5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.core.service.SqlCompleteService; import com.google.common.collect.Sets; @@ -56,7 +56,7 @@ public class SqlCompleteServiceImpl implements SqlCompleteService { @Override public List<String> getComplete(String sql) { - if (sql.length() > 0 && BLACK_SET.contains(sql.charAt(sql.length() - 1))) { + if (!sql.isEmpty() && BLACK_SET.contains(sql.charAt(sql.length() - 1))) { return new ArrayList<>(); } String[] temp = sql.split("\\s"); @@ -184,7 +184,7 @@ public class SqlCompleteServiceImpl implements SqlCompleteService { nowStep = nowStep.get(nowChar).getNext(); loc += 1; } - Utils.requireNotNull(preNode); + AssertUtils.notNull(preNode); preNode.setStop(); preNode.setCount(count); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java index 98606541b..d52bdc959 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.FlinkExecutionMode; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -73,8 +73,8 @@ public class YarnQueueServiceImpl extends ServiceImpl<YarnQueueMapper, YarnQueue @Override public IPage<YarnQueue> getPage(YarnQueue yarnQueue, RestRequest request) { - Utils.requireNotNull(yarnQueue, "Yarn queue query params mustn't be null."); - Utils.requireNotNull( + AssertUtils.notNull(yarnQueue, "Yarn queue query params mustn't be null."); + AssertUtils.notNull( yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null."); Page<YarnQueue> page = MybatisPager.getPage(request); return this.baseMapper.selectPage(page, yarnQueue); @@ -88,8 +88,8 @@ public class YarnQueueServiceImpl extends ServiceImpl<YarnQueueMapper, YarnQueue @Override public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) { - Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be empty."); - Utils.requireNotNull(yarnQueue.getTeamId(), "Team id mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be empty."); + AssertUtils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null."); ResponseResult<String> responseResult = new ResponseResult<>(); @@ -206,8 +206,8 @@ public class YarnQueueServiceImpl extends ServiceImpl<YarnQueueMapper, YarnQueue @VisibleForTesting public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) { - Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be null."); - Utils.requireNotNull(yarnQueue.getId(), "Yarn queue id mustn't be null."); + AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be null."); + AssertUtils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null."); YarnQueue queueFromDB = getById(yarnQueue.getId()); ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist."); return queueFromDB; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java index e2154d0dd..a03c74e61 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.service.impl; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -152,8 +152,10 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, Member> impleme public void updateMember(Member member) { Member oldMember = this.getById(member.getId()); ApiAlertException.throwIfNull(oldMember, "The member [id=%s] not found", member.getId()); - Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed."); - Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed."); + AssertUtils.state( + oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed."); + AssertUtils.state( + oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed."); ApiAlertException.throwIfNull( roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId()); oldMember.setRoleId(member.getRoleId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java index 1ae53feaa..8ce251619 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java @@ -17,8 +17,8 @@ package org.apache.streampark.console.system.service.impl; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.DateUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; @@ -93,7 +93,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us public IPage<User> getPage(User user, RestRequest request) { Page<User> page = MybatisPager.getPage(request); IPage<User> resPage = this.baseMapper.selectPage(page, user); - Utils.requireNotNull(resPage); + AssertUtils.notNull(resPage); if (resPage.getTotal() == 0) { resPage.setRecords(Collections.emptyList()); } @@ -197,7 +197,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us @Override public void setLastTeam(Long teamId, Long userId) { User user = getById(userId); - Utils.requireNotNull(user); + AssertUtils.notNull(user); user.setLastTeamId(teamId); this.baseMapper.updateById(user); } @@ -205,7 +205,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us @Override public void clearLastTeam(Long userId, Long teamId) { User user = getById(userId); - Utils.requireNotNull(user); + AssertUtils.notNull(user); if (!teamId.equals(user.getLastTeamId())) { return; } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java index 398fce3d5..c7fd1088e 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java @@ -19,10 +19,10 @@ package org.apache.streampark.console; import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.commons.io.FileUtils; -import org.apache.flink.util.Preconditions; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; @@ -104,13 +104,12 @@ public abstract class SpringIntegrationTestBase { } private static File tryFindStreamParkPackagedDirFile() { - String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir")); + String userDir = AssertUtils.notNull(SystemPropertyUtils.get("user.dir")); File pkgTargetDirFile = new File(userDir, "target"); - Preconditions.checkState( + AssertUtils.state( pkgTargetDirFile.exists(), - "The target directory of %s doesn't exist. %s", - userDir, - RUN_PKG_SCRIPT_HINT); + String.format( + "The target directory of %s doesn't exist. %s", userDir, RUN_PKG_SCRIPT_HINT)); Optional<File> availablePkgParentFileOpt = Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst(); final File availablePkgParentFile = diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala index e06a9a1b1..142bedcc3 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala @@ -61,7 +61,7 @@ object DockerRetriever { /** set docker-host for kata */ def setDockerHost(): Unit = { val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST) - if (Utils.requireNotEmpty(dockerhost)) { + if (Utils.isNotEmpty(dockerhost)) { val dockerHostUri: URI = new URI(dockerhost) dockerHttpClientBuilder.dockerHost(dockerHostUri) } diff --git a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java index f1a8b47e8..326ff1737 100644 --- a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java +++ b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java @@ -17,7 +17,7 @@ package org.apache.streampark.testcontainer.flink; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.AssertUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,9 +98,9 @@ public class FlinkStandaloneSessionCluster implements Startable { @Override public void start() { - Utils.requireNotNull(jobManagerContainer); + AssertUtils.notNull(jobManagerContainer); jobManagerContainer.start(); - Utils.requireNotNull(taskManagerContainers); + AssertUtils.notNull(taskManagerContainers); for (FlinkContainer taskManagerContainer : taskManagerContainers) { taskManagerContainer.start(); } @@ -108,11 +108,11 @@ public class FlinkStandaloneSessionCluster implements Startable { @Override public void stop() { - Utils.requireNotNull(taskManagerContainers); + AssertUtils.notNull(taskManagerContainers); for (FlinkContainer taskManagerContainer : taskManagerContainers) { taskManagerContainer.stop(); } - Utils.requireNotNull(jobManagerContainer); + AssertUtils.notNull(jobManagerContainer); jobManagerContainer.stop(); } @@ -151,13 +151,13 @@ public class FlinkStandaloneSessionCluster implements Startable { } public Builder taskManagerNum(int taskManagerNum) { - Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1."); + AssertUtils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1."); this.taskManagerNum = taskManagerNum; return this; } public Builder slotsNumPerTm(int slotsNumPerTm) { - Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0."); + AssertUtils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0."); this.slotsNumPerTm = slotsNumPerTm; return this; }
