This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 8f2b674ecc [Feature][1.5.0] Linkis orchestrator module supports plugin
configuration (#5082) (#5120)
8f2b674ecc is described below
commit 8f2b674ecc4c13329acb23bbb1df4d33a0c4d13b
Author: yuechaoyan <[email protected]>
AuthorDate: Tue May 21 11:54:10 2024 +0800
[Feature][1.5.0] Linkis orchestrator module supports plugin configuration
(#5082) (#5120)
---
.../entrance/conf/EntranceSpringConfiguration.java | 7 +-
.../entrance/conf/EntranceConfiguration.scala | 3 +
.../entrance/execute/DefaultEntranceExecutor.scala | 9 ++
.../linkis/entrance/execute/EntranceExecutor.scala | 8 --
.../entrance/execute/EntranceExecutorManager.scala | 10 ++
.../execute/impl/EntranceExecutorManagerImpl.scala | 2 +-
.../EngineLockListener.scala} | 21 +--
.../execute/simple/SimpleASyncListener.scala | 59 ++++++++
.../execute/simple/SimpleEntranceExecutor.scala | 155 +++++++++++++++++++++
.../execute/simple/SimpleSyncListener.scala | 83 +++++++++++
.../physical/CodeLogicalUnitExecTask.scala | 4 +
11 files changed, 331 insertions(+), 30 deletions(-)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
index 86b1a91f7a..1cf9a6b4b1 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
+++
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java
@@ -44,9 +44,6 @@ import org.apache.linkis.entrance.persistence.ResultSetEngine;
import org.apache.linkis.entrance.scheduler.EntranceGroupFactory;
import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
-import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder;
-import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$;
-import org.apache.linkis.orchestrator.ecm.entity.Policy;
import org.apache.linkis.scheduler.Scheduler;
import org.apache.linkis.scheduler.SchedulerContext;
import org.apache.linkis.scheduler.executer.ExecutorManager;
@@ -204,9 +201,7 @@ public class EntranceSpringConfiguration {
@Bean
@ConditionalOnMissingBean
public ExecutorManager executorManager(GroupFactory groupFactory) {
- EngineConnManagerBuilder engineConnManagerBuilder =
EngineConnManagerBuilder$.MODULE$.builder();
- engineConnManagerBuilder.setPolicy(Policy.Process);
- return new EntranceExecutorManagerImpl(groupFactory,
engineConnManagerBuilder.build());
+ return new EntranceExecutorManagerImpl(groupFactory);
}
@Bean
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 6f634bd76b..8e667588fb 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -285,4 +285,7 @@ object EntranceConfiguration {
val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
CommonVars("linkis.entrance.failover.running.kill.enable", false)
+ val LINKIS_ENTRANCE_SKIP_ORCHESTRATOR =
+ CommonVars("linkis.entrance.skip.orchestrator", false).getValue
+
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 266de6eb5b..24c697c4ce 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -261,6 +261,15 @@ class DefaultEntranceExecutor(id: Long)
true
}
+ def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
+ val asyncReturn = getEngineExecuteAsyncReturn
+ if (asyncReturn.isDefined) {
+ asyncReturn.get.getOrchestrationFuture()
+ } else {
+ None
+ }
+ }
+
override protected def callExecute(request: ExecuteRequest): ExecuteResponse
= {
val entranceExecuteRequest: EntranceExecuteRequest = request match {
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index be7fb13871..d20b5ea8fb 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -118,14 +118,6 @@ abstract class EntranceExecutor(val id: Long) extends
Executor with Logging {
super.hashCode()
}
- def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
- if (null != engineReturn) {
- engineReturn.getOrchestrationFuture()
- } else {
- None
- }
- }
-
}
class EngineExecuteAsyncReturn(
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 05bc5311b0..4e7ca79367 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -20,8 +20,10 @@ package org.apache.linkis.entrance.execute
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
import org.apache.linkis.entrance.exception.EntranceErrorException
+import org.apache.linkis.entrance.execute.simple.{SimpleEntranceExecutor,
SimpleExecuteBusContext}
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.scheduler.executer.{Executor, ExecutorManager}
import org.apache.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent}
@@ -91,6 +93,14 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
case jobReq: JobRequest =>
val entranceEntranceExecutor =
new DefaultEntranceExecutor(jobReq.getId)
+ if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
+ new SimpleEntranceExecutor(
+ jobReq.getId,
+ SimpleExecuteBusContext.getOrchestratorListenerBusContext()
+ )
+ } else {
+ new DefaultEntranceExecutor(jobReq.getId)
+ }
// getEngineConn Executor
job.getLogListener.foreach(
_.onLogUpdate(
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
index 3efcf41c89..a251c56de7 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.orchestrator.ecm.EngineConnManager
import org.apache.linkis.scheduler.listener.ExecutorListener
import org.apache.linkis.scheduler.queue.GroupFactory
-class EntranceExecutorManagerImpl(groupFactory: GroupFactory,
engineConnManager: EngineConnManager)
+class EntranceExecutorManagerImpl(groupFactory: GroupFactory)
extends EntranceExecutorManager(groupFactory) {
override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] =
Array(
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala
similarity index 50%
copy from
linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
copy to
linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala
index 3efcf41c89..6f2798a52c 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala
@@ -15,23 +15,14 @@
* limitations under the License.
*/
-package org.apache.linkis.entrance.execute.impl
+package org.apache.linkis.entrance.execute.simple
+import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
-import org.apache.linkis.entrance.execute._
-import org.apache.linkis.orchestrator.ecm.EngineConnManager
-import org.apache.linkis.scheduler.listener.ExecutorListener
-import org.apache.linkis.scheduler.queue.GroupFactory
+object SimpleExecuteBusContext {
-class EntranceExecutorManagerImpl(groupFactory: GroupFactory,
engineConnManager: EngineConnManager)
- extends EntranceExecutorManager(groupFactory) {
+ private lazy val orchestratorListenerBusContext =
OrchestratorListenerBusContext.createBusContext
- override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] =
Array(
- JobExecuteRequestInterceptor,
- LabelExecuteRequestInterceptor,
- ReconnectExecuteRequestInterceptor,
- StorePathExecuteRequestInterceptor,
- RuntimePropertiesExecuteRequestInterceptor
- )
+ def getOrchestratorListenerBusContext(): OrchestratorListenerBusContext =
+ orchestratorListenerBusContext
- override def setExecutorListener(engineListener: ExecutorListener): Unit = {}
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala
new file mode 100644
index 0000000000..bc52fbd800
--- /dev/null
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.entrance.execute.simple
+
+import org.apache.linkis.common.listener.Event
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.entrance.EntranceServer
+import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.orchestrator.listener.OrchestratorAsyncEvent
+import org.apache.linkis.orchestrator.listener.task.{
+ TaskLogEvent,
+ TaskLogListener,
+ TaskProgressListener,
+ TaskRunningInfoEvent
+}
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Component
+
+import javax.annotation.PostConstruct
+
+@Component
+class SimpleASyncListener extends TaskLogListener with TaskProgressListener
with Logging {
+
+ @Autowired private var entranceServer: EntranceServer = _
+
+ @PostConstruct
+ def init(): Unit = {
+ if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
+ SimpleExecuteBusContext
+ .getOrchestratorListenerBusContext()
+ .getOrchestratorAsyncListenerBus
+ .addListener(this)
+ }
+ }
+
+ override def onLogUpdate(taskLogEvent: TaskLogEvent): Unit = {}
+
+ override def onProgressOn(taskProgressEvent: TaskRunningInfoEvent): Unit = {}
+
+ override def onEvent(event: OrchestratorAsyncEvent): Unit = {}
+
+ override def onEventError(event: Event, t: Throwable): Unit = {}
+}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala
new file mode 100644
index 0000000000..d9e18081d2
--- /dev/null
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.entrance.execute.simple
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.entrance.exception.{EntranceErrorCode,
EntranceErrorException}
+import org.apache.linkis.entrance.execute.{EngineExecuteAsyncReturn,
EntranceExecutor}
+import org.apache.linkis.entrance.job.EntranceExecuteRequest
+import org.apache.linkis.governance.common.utils.LoggerUtils
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
+import org.apache.linkis.manager.label.utils.LabelUtil
+import org.apache.linkis.orchestrator.code.plans.ast.CodeJob
+import
org.apache.linkis.orchestrator.code.plans.logical.CodeLogicalUnitTaskDesc
+import org.apache.linkis.orchestrator.computation.entity.ComputationJobReq
+import
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
+import org.apache.linkis.orchestrator.converter.ASTContextImpl
+import org.apache.linkis.orchestrator.execution.{
+ AsyncTaskResponse,
+ FailedTaskResponse,
+ SucceedTaskResponse
+}
+import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
+import org.apache.linkis.orchestrator.plans.physical.{ExecTask,
PhysicalContextImpl}
+import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit
+import org.apache.linkis.scheduler.executer._
+
+import java.util
+
+class SimpleEntranceExecutor(
+ id: Long,
+ orchestratorListenerBusContext: OrchestratorListenerBusContext
+) extends EntranceExecutor(id)
+ with SingleTaskOperateSupport
+ with Logging {
+
+ private var codeUnitExecTask: CodeLogicalUnitExecTask = null
+
+ override protected def callExecute(request: ExecuteRequest): ExecuteResponse
= {
+ val entranceExecuteRequest: EntranceExecuteRequest = request match {
+ case request: EntranceExecuteRequest =>
+ request
+ case _ =>
+ throw new EntranceErrorException(
+ EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode,
+ s"Invalid entranceExecuteRequest : ${request.code}"
+ )
+ }
+ // 1. create JobReq
+ val computationJobReq = requestToComputationJobReq(entranceExecuteRequest)
+ // 2. create code job
+ val codeJob = new CodeJob(null, null)
+ val astContext =
ASTContextImpl.newBuilder().setJobReq(computationJobReq).build()
+ codeJob.setAstContext(astContext)
+ codeJob.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit)
+ codeJob.setParams(computationJobReq.getParams)
+ codeJob.setName(computationJobReq.getName + "_Job")
+ codeJob.setSubmitUser(computationJobReq.getSubmitUser)
+ codeJob.setExecuteUser(computationJobReq.getExecuteUser)
+ codeJob.setLabels(computationJobReq.getLabels)
+ codeJob.setPriority(computationJobReq.getPriority)
+ codeUnitExecTask = new CodeLogicalUnitExecTask(Array[ExecTask](),
Array[ExecTask]())
+ // set job id, can find by
getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job])
+ codeUnitExecTask.setId(entranceExecuteRequest.getJob.getId)
+ // 3.set code unit
+ codeUnitExecTask.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit)
+ codeUnitExecTask.setTaskDesc(CodeLogicalUnitTaskDesc(codeJob))
+ // 4. set context
+ val context = new PhysicalContextImpl(codeUnitExecTask, Array.empty)
+
context.setSyncBus(orchestratorListenerBusContext.getOrchestratorSyncListenerBus)
+
context.setAsyncBus(orchestratorListenerBusContext.getOrchestratorAsyncListenerBus)
+ // 5. execute
+ val response = codeUnitExecTask.execute()
+ response match {
+ case async: AsyncTaskResponse =>
+ new EngineExecuteAsyncReturn(request, null)
+ case succeed: SucceedTaskResponse =>
+ logger.info(s"Succeed to execute ExecTask(${getId})")
+ SuccessExecuteResponse()
+ case failedTaskResponse: FailedTaskResponse =>
+ logger.info(s"Failed to execute ExecTask(${getId})")
+ ErrorExecuteResponse(failedTaskResponse.getErrorMsg,
failedTaskResponse.getCause)
+ case _ =>
+ logger.warn(s"ExecTask(${getId}) need to retry")
+ ErrorExecuteResponse("unknown response: " + response, null)
+ }
+ }
+
+ def requestToComputationJobReq(
+ entranceExecuteRequest: EntranceExecuteRequest
+ ): ComputationJobReq = {
+ val jobReqBuilder = ComputationJobReq.newBuilder()
+ jobReqBuilder.setId(entranceExecuteRequest.jobId())
+ jobReqBuilder.setSubmitUser(entranceExecuteRequest.submitUser())
+ jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
+ val codeTypeLabel: Label[_] =
LabelUtil.getCodeTypeLabel(entranceExecuteRequest.getLabels)
+ if (null == codeTypeLabel) {
+ throw new EntranceErrorException(
+ EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode,
+ s"code Type Label is needed"
+ )
+ }
+ val codes = new util.ArrayList[String]()
+ codes.add(entranceExecuteRequest.code())
+ val codeLogicalUnit =
+ new CodeLogicalUnit(codes, codeTypeLabel.asInstanceOf[CodeLanguageLabel])
+ jobReqBuilder.setCodeLogicalUnit(codeLogicalUnit)
+ jobReqBuilder.setLabels(entranceExecuteRequest.getLabels)
+ jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
+ jobReqBuilder.setParams(entranceExecuteRequest.properties())
+ jobReqBuilder.build().asInstanceOf[ComputationJobReq]
+ }
+
+ override def kill(): Boolean = {
+ LoggerUtils.setJobIdMDC(getId.toString)
+ logger.info("Entrance start to kill job {} invoke Orchestrator ",
this.getId)
+ Utils.tryAndWarn {
+ if (null != codeUnitExecTask) {
+ codeUnitExecTask.kill()
+ }
+ }
+ LoggerUtils.removeJobIdMDC()
+ true
+ }
+
+ override def pause(): Boolean = {
+ true
+ }
+
+ override def resume(): Boolean = {
+ true
+ }
+
+ override def close(): Unit = {
+ getEngineExecuteAsyncReturn.foreach { e =>
+ e.notifyError(s"$toString has already been completed with state $state.")
+ }
+ }
+
+}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala
new file mode 100644
index 0000000000..46107ff701
--- /dev/null
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.entrance.execute.simple
+
+import org.apache.linkis.common.listener.Event
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.entrance.EntranceServer
+import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.orchestrator.listener.OrchestratorSyncEvent
+import org.apache.linkis.orchestrator.listener.task.{
+ TaskErrorResponseEvent,
+ TaskResultSetEvent,
+ TaskResultSetListener,
+ TaskResultSetSizeEvent,
+ TaskStatusEvent,
+ TaskStatusListener
+}
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Component
+
+import javax.annotation.PostConstruct
+
+/**
+ * 1.TaskLogListener: job.getLogListener.foreach(_.onLogUpdate(job,
logEvent.log))
+ *
+ * 2.TaskProgressListener: entranceJob.getProgressListener.foreach(
_.onProgressUpdate(entranceJob,
+ * progressInfoEvent.progress, entranceJob.getProgressInfo)
+ *
+ * 3.TaskResultSetListener
entranceContext.getOrCreatePersistenceManager().onResultSizeCreated(j,
+ * taskResultSize.resultSize) .getOrCreatePersistenceManager()
.onResultSetCreated(
+ * entranceExecuteRequest.getJob,
AliasOutputExecuteResponse(firstResultSet.alias,
+ * firstResultSet.result) )
+ *
+ * 4. TaskStatusListener getEngineExecuteAsyncReturn.foreach { jobReturn =>
jobReturn.notifyStatus(
+ * ResponseTaskStatus(entranceExecuteRequest.getJob.getId,
ExecutionNodeStatus.Succeed) ) } val msg
+ * = failedResponse.getErrorCode + ", " + failedResponse.getErrorMsg
+ * getEngineExecuteAsyncReturn.foreach { jobReturn =>
jobReturn.notifyError(msg,
+ * failedResponse.getCause) jobReturn.notifyStatus(
+ * ResponseTaskStatus(entranceExecuteRequest.getJob.getId,
ExecutionNodeStatus.Failed) ) }
+ */
+@Component
+class SimpleSyncListener extends TaskStatusListener with TaskResultSetListener
with Logging {
+
+ @Autowired private var entranceServer: EntranceServer = _
+
+ @PostConstruct
+ def init(): Unit = {
+ if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
+ SimpleExecuteBusContext
+ .getOrchestratorListenerBusContext()
+ .getOrchestratorSyncListenerBus
+ .addListener(this)
+ }
+ }
+
+ override def onStatusUpdate(taskStatusEvent: TaskStatusEvent): Unit = {}
+
+ override def onTaskErrorResponseEvent(taskErrorResponseEvent:
TaskErrorResponseEvent): Unit = {}
+
+ override def onResultSetCreate(taskResultSetEvent: TaskResultSetEvent): Unit
= {}
+
+ override def onResultSizeCreated(taskResultSetSizeEvent:
TaskResultSetSizeEvent): Unit = {}
+
+ override def onSyncEvent(event: OrchestratorSyncEvent): Unit = {}
+
+ override def onEventError(event: Event, t: Throwable): Unit = {}
+}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index 82e75c0bd7..0b915ab768 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -226,6 +226,10 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask],
children: Array[ExecTask
id
}
+ def setId(id: String): Unit = {
+ this.id = id
+ }
+
override def getPhysicalContext: PhysicalContext = physicalContext
def getCodeLogicalUnit: CodeLogicalUnit = this.codeLogicalUnit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]