This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new eb5de3101 [ISSUE-3142][Improve] Extract the exception utils methods
into ExceptionUtils. (#3143)
eb5de3101 is described below
commit eb5de31017d2dd7573bfe3486ad51d9c17f0b733
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Sep 19 20:15:22 2023 +0800
[ISSUE-3142][Improve] Extract the exception utils methods into
ExceptionUtils. (#3143)
* [ISSUE-3142][Improve] Extract the exception utils methods into
ExceptionUtils.
* Add license header
---
.../streampark/common/util/ExceptionUtils.java | 29 ++++++++++++++--------
.../org/apache/streampark/common/util/Utils.scala | 19 --------------
.../console/base/exception/AlertException.java | 4 +--
.../console/base/exception/ApiDetailException.java | 6 ++---
.../impl/ApplicationActionServiceImpl.java | 5 ++--
.../impl/ApplicationInfoServiceImpl.java | 3 ++-
.../core/service/impl/AppBuildPipeServiceImpl.java | 6 +++--
.../core/service/impl/FlinkSqlServiceImpl.java | 4 ++-
.../core/service/impl/ResourceServiceImpl.java | 7 +++---
.../core/service/impl/SavePointServiceImpl.java | 3 ++-
.../flink/client/trait/YarnClientTrait.scala | 4 +--
.../streampark/flink/core/FlinkSqlValidator.scala | 4 +--
12 files changed, 45 insertions(+), 49 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
similarity index 56%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
copy to
streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
index 9763f67c0..712686493 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
@@ -15,20 +15,27 @@
* limitations under the License.
*/
-package org.apache.streampark.console.base.exception;
+package org.apache.streampark.common.util;
-import org.apache.streampark.common.util.Utils;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
-public class AlertException extends ApiAlertException {
- public AlertException(String message) {
- super(message);
- }
+/** Utils to process exception message. */
+public class ExceptionUtils {
- public AlertException(Throwable cause) {
- super(Utils.stringifyException(cause));
- }
+ private ExceptionUtils() {}
- public AlertException(String message, Throwable cause) {
- super(message, cause);
+ public static String stringifyException(Throwable throwable) {
+ if (throwable == null) {
+ return "(null)";
+ }
+ try (StringWriter stm = new StringWriter();
+ PrintWriter writer = new PrintWriter(stm)) {
+ throwable.printStackTrace(writer);
+ return stm.toString();
+ } catch (IOException e) {
+ return e.getClass().getName() + " (error while printing stack trace)";
+ }
}
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index cd6981717..8e98a7b92 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -98,9 +98,6 @@ object Utils {
def copyProperties(original: Properties, target: Properties): Unit =
original.foreach(x => target.put(x._1, x._2))
- /** get os name */
- def getOsName: String = OS
-
def isLinux: Boolean = OS.indexOf("linux") >= 0
def isWindows: Boolean = OS.indexOf("windows") >= 0
@@ -126,22 +123,6 @@ object Utils {
result
}
- def stringifyException(e: Throwable): String = {
- if (e == null) "(null)"
- else {
- try {
- val stm = new StringWriter
- new PrintWriter(stm).autoClose {
- writer =>
- e.printStackTrace(writer)
- stm.toString
- }
- } catch {
- case _: Throwable => e.getClass.getName + " (error while printing
stack trace)"
- }
- }
- }
-
def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit =
null): Unit = {
closeable.foreach(
c => {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
index 9763f67c0..812ebed4e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/AlertException.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.base.exception;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.ExceptionUtils;
public class AlertException extends ApiAlertException {
public AlertException(String message) {
@@ -25,7 +25,7 @@ public class AlertException extends ApiAlertException {
}
public AlertException(Throwable cause) {
- super(Utils.stringifyException(cause));
+ super(ExceptionUtils.stringifyException(cause));
}
public AlertException(String message, Throwable cause) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiDetailException.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiDetailException.java
index 6385f0167..5adbd8c72 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiDetailException.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiDetailException.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.base.exception;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.console.base.domain.ResponseCode;
/**
@@ -37,11 +37,11 @@ public class ApiDetailException extends
AbstractApiException {
}
public ApiDetailException(Throwable cause) {
- super(Utils.stringifyException(cause), ResponseCode.CODE_FAIL_DETAIL);
+ super(ExceptionUtils.stringifyException(cause),
ResponseCode.CODE_FAIL_DETAIL);
}
public ApiDetailException(String message, Throwable cause) {
- super(message + Utils.stringifyException(cause),
ResponseCode.CODE_FAIL_DETAIL);
+ super(message + ExceptionUtils.stringifyException(cause),
ResponseCode.CODE_FAIL_DETAIL);
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 2cba9acc2..a364c1db9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.RestoreMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.ThreadUtils;
@@ -360,7 +361,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
FlinkAppHttpWatcher.unWatching(application.getId());
}
- String exception = Utils.stringifyException(e);
+ String exception = ExceptionUtils.stringifyException(e);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
}
@@ -520,7 +521,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
if (e.getCause() instanceof CancellationException) {
updateToStopped(application);
} else {
- String exception = Utils.stringifyException(e);
+ String exception = ExceptionUtils.stringifyException(e);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
Application app = getById(appParam.getId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index c4e1b27d3..528f31169 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.LfsOperator;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -212,7 +213,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
return true;
} catch (Exception e) {
- log.error(Utils.stringifyException(e));
+ log.error(ExceptionUtils.stringifyException(e));
throw new ApiDetailException(e);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 4e0bfa483..8c3120638 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
@@ -347,13 +348,14 @@ public class AppBuildPipeServiceImpl
commonService.getUserId(),
app.getId(),
app.getJobName().concat(" release failed"),
- Utils.stringifyException(snapshot.error().exception()),
+
ExceptionUtils.stringifyException(snapshot.error().exception()),
NoticeType.EXCEPTION);
messageService.push(message);
app.setRelease(ReleaseState.FAILED.get());
app.setOptionState(OptionState.NONE.getValue());
app.setBuild(true);
-
applicationLog.setException(Utils.stringifyException(snapshot.error().exception()));
+ applicationLog.setException(
+
ExceptionUtils.stringifyException(snapshot.error().exception()));
applicationLog.setSuccess(false);
}
applicationManageService.updateRelease(app);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index c5e6961a1..0c4abe195 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.Constant;
import org.apache.streampark.console.base.domain.RestRequest;
@@ -202,7 +203,8 @@ public class FlinkSqlServiceImpl extends
ServiceImpl<FlinkSqlMapper, FlinkSql>
}
return FlinkShimsProxy.getObject(this.getClass().getClassLoader(),
result);
} catch (Throwable e) {
- log.error("verifySql invocationTargetException: {}",
Utils.stringifyException(e));
+ log.error(
+ "verifySql invocationTargetException: {}",
ExceptionUtils.stringifyException(e));
}
return null;
});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index c5e35c8b6..30fb4fdf8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
@@ -284,7 +285,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
} catch (Exception e) {
// get jarFile error
resp.put("state", 1);
- resp.put("exception", Utils.stringifyException(e));
+ resp.put("exception", ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) {
@@ -321,7 +322,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
} catch (Exception e) {
// connector download is null
resp.put("state", 1);
- resp.put("exception", Utils.stringifyException(e));
+ resp.put("exception", ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
@@ -343,7 +344,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
} catch (Exception e) {
// flink connector invalid
resp.put("state", 2);
- resp.put("exception", Utils.stringifyException(e));
+ resp.put("exception", ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 67d609fe3..526ef2565 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.CompletableFutureUtils;
+import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.Constant;
@@ -254,7 +255,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
},
e -> {
log.error("Trigger savepoint for flink job failed.", e);
- String exception = Utils.stringifyException(e);
+ String exception = ExceptionUtils.stringifyException(e);
applicationLog.setException(exception);
if (!(e instanceof TimeoutException)) {
applicationLog.setSuccess(false);
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 6cc667c6e..456b260c2 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,8 +17,8 @@
package org.apache.streampark.flink.client.`trait`
+import org.apache.streampark.common.util.{ExceptionUtils, Utils}
import org.apache.streampark.common.util.ImplicitsUtils._
-import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.bean._
import org.apache.flink.api.common.JobID
@@ -61,7 +61,7 @@ trait YarnClientTrait extends FlinkClientTrait {
case e =>
throw new FlinkException(
s"[StreamPark] Do ${request.getClass.getSimpleName} for the
job ${request.jobId} failed. " +
- s"detail: ${Utils.stringifyException(e)}");
+ s"detail: ${ExceptionUtils.stringifyException(e)}");
}.get)
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 721a143de..36cc11d02 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
-import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.{ExceptionUtils, Logger, Utils}
import org.apache.streampark.flink.core.SqlCommand._
import org.apache.calcite.config.Lex
@@ -104,7 +104,7 @@ object FlinkSqlValidator extends Logger {
}
} match {
case Failure(e) =>
- val exception = Utils.stringifyException(e)
+ val exception = ExceptionUtils.stringifyException(e)
val causedBy = exception.drop(exception.indexOf("Caused by:"))
val cleanUpError = exception.replaceAll("[\r\n]", "")
if (SYNTAX_ERROR_REGEXP.findAllMatchIn(cleanUpError).nonEmpty) {