This is an automated email from the ASF dual-hosted git repository. casion pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit f71dc8f42feb83c034c166049d423d31f8188f79 Author: peacewong <[email protected]> AuthorDate: Tue Jun 27 22:13:28 2023 +0800 need to throw cause exception #4711 --- .../executor/execute/ConcurrentComputationExecutor.scala | 4 +++- .../am/service/engine/DefaultEngineAskEngineService.java | 12 +++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala index a9447109d..f192a4525 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala @@ -42,7 +42,9 @@ abstract class ConcurrentComputationExecutor(override val outputPrintLimit: Int transition(NodeStatus.Busy) } } - logger.info(s"engineConnTask(${engineConnTask.getTaskId}) running task is ($getRunningTask) ") + logger.info( + s"engineConnTask(${engineConnTask.getTaskId}) running task is ($getRunningTask) status ${getStatus}" + ) val response = super.execute(engineConnTask) if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask) { EXECUTOR_STATUS_LOCKER.synchronized { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java index cc17e68a9..36675ff84 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java @@ -35,9 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.net.SocketTimeoutException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import feign.RetryableException; @@ -152,6 +150,11 @@ public class DefaultEngineAskEngineService extends AbstractEngineService createNodeThread.whenComplete( (EngineNode engineNode, Throwable exception) -> { LoggerUtils.setJobIdMDC(taskId); + if (exception instanceof CompletionException || exception instanceof ExecutionException) { + if (exception.getCause() != null) { + exception = exception.getCause(); + } + } if (exception != null) { boolean retryFlag; if (exception instanceof LinkisRetryException) { @@ -177,8 +180,7 @@ public class DefaultEngineAskEngineService extends AbstractEngineService } else { logger.info( String.format( - "msg: %s canRetry Exception: %s", msg, exception.getClass().getName()), - exception); + "msg: %s canRetry Exception: %s", msg, exception.getClass().getName())); } sender.send( new EngineCreateError( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
