This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f2b674ecc [Feature][1.5.0] Linkis orchestrator module supports plugin 
configuration (#5082) (#5120)
8f2b674ecc is described below

commit 8f2b674ecc4c13329acb23bbb1df4d33a0c4d13b
Author: yuechaoyan <[email protected]>
AuthorDate: Tue May 21 11:54:10 2024 +0800

    [Feature][1.5.0] Linkis orchestrator module supports plugin configuration 
(#5082) (#5120)
---
 .../entrance/conf/EntranceSpringConfiguration.java |   7 +-
 .../entrance/conf/EntranceConfiguration.scala      |   3 +
 .../entrance/execute/DefaultEntranceExecutor.scala |   9 ++
 .../linkis/entrance/execute/EntranceExecutor.scala |   8 --
 .../entrance/execute/EntranceExecutorManager.scala |  10 ++
 .../execute/impl/EntranceExecutorManagerImpl.scala |   2 +-
 .../EngineLockListener.scala}                      |  21 +--
 .../execute/simple/SimpleASyncListener.scala       |  59 ++++++++
 .../execute/simple/SimpleEntranceExecutor.scala    | 155 +++++++++++++++++++++
 .../execute/simple/SimpleSyncListener.scala        |  83 +++++++++++
 .../physical/CodeLogicalUnitExecTask.scala         |   4 +
 11 files changed, 331 insertions(+), 30 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
 
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
index 86b1a91f7a..1cf9a6b4b1 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
@@ -44,9 +44,6 @@ import org.apache.linkis.entrance.persistence.ResultSetEngine;
 import org.apache.linkis.entrance.scheduler.EntranceGroupFactory;
 import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager;
 import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
-import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder;
-import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$;
-import org.apache.linkis.orchestrator.ecm.entity.Policy;
 import org.apache.linkis.scheduler.Scheduler;
 import org.apache.linkis.scheduler.SchedulerContext;
 import org.apache.linkis.scheduler.executer.ExecutorManager;
@@ -204,9 +201,7 @@ public class EntranceSpringConfiguration {
   @Bean
   @ConditionalOnMissingBean
   public ExecutorManager executorManager(GroupFactory groupFactory) {
-    EngineConnManagerBuilder engineConnManagerBuilder = 
EngineConnManagerBuilder$.MODULE$.builder();
-    engineConnManagerBuilder.setPolicy(Policy.Process);
-    return new EntranceExecutorManagerImpl(groupFactory, 
engineConnManagerBuilder.build());
+    return new EntranceExecutorManagerImpl(groupFactory);
   }
 
   @Bean
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 6f634bd76b..8e667588fb 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -285,4 +285,7 @@ object EntranceConfiguration {
   val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
     CommonVars("linkis.entrance.failover.running.kill.enable", false)
 
+  val LINKIS_ENTRANCE_SKIP_ORCHESTRATOR =
+    CommonVars("linkis.entrance.skip.orchestrator", false).getValue
+
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 266de6eb5b..24c697c4ce 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -261,6 +261,15 @@ class DefaultEntranceExecutor(id: Long)
     true
   }
 
+  def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
+    val asyncReturn = getEngineExecuteAsyncReturn
+    if (asyncReturn.isDefined) {
+      asyncReturn.get.getOrchestrationFuture()
+    } else {
+      None
+    }
+  }
+
   override protected def callExecute(request: ExecuteRequest): ExecuteResponse 
= {
 
     val entranceExecuteRequest: EntranceExecuteRequest = request match {
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index be7fb13871..d20b5ea8fb 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -118,14 +118,6 @@ abstract class EntranceExecutor(val id: Long) extends 
Executor with Logging {
     super.hashCode()
   }
 
-  def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
-    if (null != engineReturn) {
-      engineReturn.getOrchestrationFuture()
-    } else {
-      None
-    }
-  }
-
 }
 
 class EngineExecuteAsyncReturn(
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 05bc5311b0..4e7ca79367 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -20,8 +20,10 @@ package org.apache.linkis.entrance.execute
 import org.apache.linkis.common.exception.WarnException
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.entrance.conf.EntranceConfiguration
 import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
 import org.apache.linkis.entrance.exception.EntranceErrorException
+import org.apache.linkis.entrance.execute.simple.{SimpleEntranceExecutor, 
SimpleExecuteBusContext}
 import org.apache.linkis.governance.common.entity.job.JobRequest
 import org.apache.linkis.scheduler.executer.{Executor, ExecutorManager}
 import org.apache.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent}
@@ -91,6 +93,14 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
           case jobReq: JobRequest =>
             val entranceEntranceExecutor =
               new DefaultEntranceExecutor(jobReq.getId)
+            if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
+              new SimpleEntranceExecutor(
+                jobReq.getId,
+                SimpleExecuteBusContext.getOrchestratorListenerBusContext()
+              )
+            } else {
+              new DefaultEntranceExecutor(jobReq.getId)
+            }
             // getEngineConn Executor
             job.getLogListener.foreach(
               _.onLogUpdate(
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
index 3efcf41c89..a251c56de7 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.orchestrator.ecm.EngineConnManager
 import org.apache.linkis.scheduler.listener.ExecutorListener
 import org.apache.linkis.scheduler.queue.GroupFactory
 
-class EntranceExecutorManagerImpl(groupFactory: GroupFactory, 
engineConnManager: EngineConnManager)
+class EntranceExecutorManagerImpl(groupFactory: GroupFactory)
     extends EntranceExecutorManager(groupFactory) {
 
   override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] = 
Array(
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala
similarity index 50%
copy from 
linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
copy to 
linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala
index 3efcf41c89..6f2798a52c 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala
@@ -15,23 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.entrance.execute.impl
+package org.apache.linkis.entrance.execute.simple
+import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
 
-import org.apache.linkis.entrance.execute._
-import org.apache.linkis.orchestrator.ecm.EngineConnManager
-import org.apache.linkis.scheduler.listener.ExecutorListener
-import org.apache.linkis.scheduler.queue.GroupFactory
+object SimpleExecuteBusContext {
 
-class EntranceExecutorManagerImpl(groupFactory: GroupFactory, 
engineConnManager: EngineConnManager)
-    extends EntranceExecutorManager(groupFactory) {
+  private lazy val orchestratorListenerBusContext = 
OrchestratorListenerBusContext.createBusContext
 
-  override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] = 
Array(
-    JobExecuteRequestInterceptor,
-    LabelExecuteRequestInterceptor,
-    ReconnectExecuteRequestInterceptor,
-    StorePathExecuteRequestInterceptor,
-    RuntimePropertiesExecuteRequestInterceptor
-  )
+  def getOrchestratorListenerBusContext(): OrchestratorListenerBusContext =
+    orchestratorListenerBusContext
 
-  override def setExecutorListener(engineListener: ExecutorListener): Unit = {}
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala
new file mode 100644
index 0000000000..bc52fbd800
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.entrance.execute.simple
+
+import org.apache.linkis.common.listener.Event
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.entrance.EntranceServer
+import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.orchestrator.listener.OrchestratorAsyncEvent
+import org.apache.linkis.orchestrator.listener.task.{
+  TaskLogEvent,
+  TaskLogListener,
+  TaskProgressListener,
+  TaskRunningInfoEvent
+}
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Component
+
+import javax.annotation.PostConstruct
+
+@Component
+class SimpleASyncListener extends TaskLogListener with TaskProgressListener 
with Logging {
+
+  @Autowired private var entranceServer: EntranceServer = _
+
+  @PostConstruct
+  def init(): Unit = {
+    if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
+      SimpleExecuteBusContext
+        .getOrchestratorListenerBusContext()
+        .getOrchestratorAsyncListenerBus
+        .addListener(this)
+    }
+  }
+
+  override def onLogUpdate(taskLogEvent: TaskLogEvent): Unit = {}
+
+  override def onProgressOn(taskProgressEvent: TaskRunningInfoEvent): Unit = {}
+
+  override def onEvent(event: OrchestratorAsyncEvent): Unit = {}
+
+  override def onEventError(event: Event, t: Throwable): Unit = {}
+}
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala
new file mode 100644
index 0000000000..d9e18081d2
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.entrance.execute.simple
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.entrance.exception.{EntranceErrorCode, 
EntranceErrorException}
+import org.apache.linkis.entrance.execute.{EngineExecuteAsyncReturn, 
EntranceExecutor}
+import org.apache.linkis.entrance.job.EntranceExecuteRequest
+import org.apache.linkis.governance.common.utils.LoggerUtils
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
+import org.apache.linkis.manager.label.utils.LabelUtil
+import org.apache.linkis.orchestrator.code.plans.ast.CodeJob
+import 
org.apache.linkis.orchestrator.code.plans.logical.CodeLogicalUnitTaskDesc
+import org.apache.linkis.orchestrator.computation.entity.ComputationJobReq
+import 
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
+import org.apache.linkis.orchestrator.converter.ASTContextImpl
+import org.apache.linkis.orchestrator.execution.{
+  AsyncTaskResponse,
+  FailedTaskResponse,
+  SucceedTaskResponse
+}
+import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
+import org.apache.linkis.orchestrator.plans.physical.{ExecTask, 
PhysicalContextImpl}
+import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit
+import org.apache.linkis.scheduler.executer._
+
+import java.util
+
+class SimpleEntranceExecutor(
+    id: Long,
+    orchestratorListenerBusContext: OrchestratorListenerBusContext
+) extends EntranceExecutor(id)
+    with SingleTaskOperateSupport
+    with Logging {
+
+  private var codeUnitExecTask: CodeLogicalUnitExecTask = null
+
+  override protected def callExecute(request: ExecuteRequest): ExecuteResponse 
= {
+    val entranceExecuteRequest: EntranceExecuteRequest = request match {
+      case request: EntranceExecuteRequest =>
+        request
+      case _ =>
+        throw new EntranceErrorException(
+          EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode,
+          s"Invalid entranceExecuteRequest : ${request.code}"
+        )
+    }
+    // 1. create JobReq
+    val computationJobReq = requestToComputationJobReq(entranceExecuteRequest)
+    // 2. create code job
+    val codeJob = new CodeJob(null, null)
+    val astContext = 
ASTContextImpl.newBuilder().setJobReq(computationJobReq).build()
+    codeJob.setAstContext(astContext)
+    codeJob.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit)
+    codeJob.setParams(computationJobReq.getParams)
+    codeJob.setName(computationJobReq.getName + "_Job")
+    codeJob.setSubmitUser(computationJobReq.getSubmitUser)
+    codeJob.setExecuteUser(computationJobReq.getExecuteUser)
+    codeJob.setLabels(computationJobReq.getLabels)
+    codeJob.setPriority(computationJobReq.getPriority)
+    codeUnitExecTask = new CodeLogicalUnitExecTask(Array[ExecTask](), 
Array[ExecTask]())
+    // set job id, can find by 
getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job])
+    codeUnitExecTask.setId(entranceExecuteRequest.getJob.getId)
+    // 3.set code unit
+    codeUnitExecTask.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit)
+    codeUnitExecTask.setTaskDesc(CodeLogicalUnitTaskDesc(codeJob))
+    // 4. set context
+    val context = new PhysicalContextImpl(codeUnitExecTask, Array.empty)
+    
context.setSyncBus(orchestratorListenerBusContext.getOrchestratorSyncListenerBus)
+    
context.setAsyncBus(orchestratorListenerBusContext.getOrchestratorAsyncListenerBus)
+    // 5. execute
+    val response = codeUnitExecTask.execute()
+    response match {
+      case async: AsyncTaskResponse =>
+        new EngineExecuteAsyncReturn(request, null)
+      case succeed: SucceedTaskResponse =>
+        logger.info(s"Succeed to execute ExecTask(${getId})")
+        SuccessExecuteResponse()
+      case failedTaskResponse: FailedTaskResponse =>
+        logger.info(s"Failed to execute ExecTask(${getId})")
+        ErrorExecuteResponse(failedTaskResponse.getErrorMsg, 
failedTaskResponse.getCause)
+      case _ =>
+        logger.warn(s"ExecTask(${getId}) need to retry")
+        ErrorExecuteResponse("unknown response: " + response, null)
+    }
+  }
+
+  def requestToComputationJobReq(
+      entranceExecuteRequest: EntranceExecuteRequest
+  ): ComputationJobReq = {
+    val jobReqBuilder = ComputationJobReq.newBuilder()
+    jobReqBuilder.setId(entranceExecuteRequest.jobId())
+    jobReqBuilder.setSubmitUser(entranceExecuteRequest.submitUser())
+    jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
+    val codeTypeLabel: Label[_] = 
LabelUtil.getCodeTypeLabel(entranceExecuteRequest.getLabels)
+    if (null == codeTypeLabel) {
+      throw new EntranceErrorException(
+        EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode,
+        s"code Type Label is needed"
+      )
+    }
+    val codes = new util.ArrayList[String]()
+    codes.add(entranceExecuteRequest.code())
+    val codeLogicalUnit =
+      new CodeLogicalUnit(codes, codeTypeLabel.asInstanceOf[CodeLanguageLabel])
+    jobReqBuilder.setCodeLogicalUnit(codeLogicalUnit)
+    jobReqBuilder.setLabels(entranceExecuteRequest.getLabels)
+    jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
+    jobReqBuilder.setParams(entranceExecuteRequest.properties())
+    jobReqBuilder.build().asInstanceOf[ComputationJobReq]
+  }
+
+  override def kill(): Boolean = {
+    LoggerUtils.setJobIdMDC(getId.toString)
+    logger.info("Entrance start to kill job {} invoke Orchestrator ", 
this.getId)
+    Utils.tryAndWarn {
+      if (null != codeUnitExecTask) {
+        codeUnitExecTask.kill()
+      }
+    }
+    LoggerUtils.removeJobIdMDC()
+    true
+  }
+
+  override def pause(): Boolean = {
+    true
+  }
+
+  override def resume(): Boolean = {
+    true
+  }
+
+  override def close(): Unit = {
+    getEngineExecuteAsyncReturn.foreach { e =>
+      e.notifyError(s"$toString has already been completed with state $state.")
+    }
+  }
+
+}
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala
new file mode 100644
index 0000000000..46107ff701
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.entrance.execute.simple
+
+import org.apache.linkis.common.listener.Event
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.entrance.EntranceServer
+import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.orchestrator.listener.OrchestratorSyncEvent
+import org.apache.linkis.orchestrator.listener.task.{
+  TaskErrorResponseEvent,
+  TaskResultSetEvent,
+  TaskResultSetListener,
+  TaskResultSetSizeEvent,
+  TaskStatusEvent,
+  TaskStatusListener
+}
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Component
+
+import javax.annotation.PostConstruct
+
+/**
+ * 1.TaskLogListener: job.getLogListener.foreach(_.onLogUpdate(job, 
logEvent.log))
+ *
+ * 2.TaskProgressListener: entranceJob.getProgressListener.foreach( 
_.onProgressUpdate(entranceJob,
+ * progressInfoEvent.progress, entranceJob.getProgressInfo)
+ *
+ * 3.TaskResultSetListener 
entranceContext.getOrCreatePersistenceManager().onResultSizeCreated(j,
+ * taskResultSize.resultSize) .getOrCreatePersistenceManager() 
.onResultSetCreated(
+ * entranceExecuteRequest.getJob, 
AliasOutputExecuteResponse(firstResultSet.alias,
+ * firstResultSet.result) )
+ *
+ * 4. TaskStatusListener getEngineExecuteAsyncReturn.foreach { jobReturn => 
jobReturn.notifyStatus(
+ * ResponseTaskStatus(entranceExecuteRequest.getJob.getId, 
ExecutionNodeStatus.Succeed) ) } val msg
+ * = failedResponse.getErrorCode + ", " + failedResponse.getErrorMsg
+ * getEngineExecuteAsyncReturn.foreach { jobReturn => 
jobReturn.notifyError(msg,
+ * failedResponse.getCause) jobReturn.notifyStatus(
+ * ResponseTaskStatus(entranceExecuteRequest.getJob.getId, 
ExecutionNodeStatus.Failed) ) }
+ */
+@Component
+class SimpleSyncListener extends TaskStatusListener with TaskResultSetListener 
with Logging {
+
+  @Autowired private var entranceServer: EntranceServer = _
+
+  @PostConstruct
+  def init(): Unit = {
+    if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
+      SimpleExecuteBusContext
+        .getOrchestratorListenerBusContext()
+        .getOrchestratorSyncListenerBus
+        .addListener(this)
+    }
+  }
+
+  override def onStatusUpdate(taskStatusEvent: TaskStatusEvent): Unit = {}
+
+  override def onTaskErrorResponseEvent(taskErrorResponseEvent: 
TaskErrorResponseEvent): Unit = {}
+
+  override def onResultSetCreate(taskResultSetEvent: TaskResultSetEvent): Unit 
= {}
+
+  override def onResultSizeCreated(taskResultSetSizeEvent: 
TaskResultSetSizeEvent): Unit = {}
+
+  override def onSyncEvent(event: OrchestratorSyncEvent): Unit = {}
+
+  override def onEventError(event: Event, t: Throwable): Unit = {}
+}
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index 82e75c0bd7..0b915ab768 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -226,6 +226,10 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], 
children: Array[ExecTask
     id
   }
 
+  def setId(id: String): Unit = {
+    this.id = id
+  }
+
   override def getPhysicalContext: PhysicalContext = physicalContext
 
   def getCodeLogicalUnit: CodeLogicalUnit = this.codeLogicalUnit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to