This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 84fd9f171d68c39fc2b8238be17728f5ff728ab1 Author: kipshi <[email protected]> AuthorDate: Wed Jun 8 00:05:47 2022 +0800 [INLONG-4556][Manager] Optimize the operation for the workflow processor (#4558) --- .../manager/workflow/core/TransactionHelper.java | 3 +- .../workflow/core/impl/ProcessorExecutorImpl.java | 28 --------------- .../workflow/processor/ServiceTaskProcessor.java | 41 +++++++++++++++------- 3 files changed, 31 insertions(+), 41 deletions(-) diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java index 704d2bc0b..2cf61e4ba 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java @@ -33,10 +33,11 @@ import org.springframework.util.Assert; import java.lang.reflect.UndeclaredThrowableException; /** - * Transaction Helper + * Transaction Helper, now deprecated because we use @Transactional instead */ @Slf4j @Service +@Deprecated public class TransactionHelper { @Autowired diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java index bf9c055f3..5366cf39c 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java @@ -21,15 +21,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.inlong.manager.common.exceptions.WorkflowException; -import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException; -import org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.core.ProcessorExecutor; -import org.apache.inlong.manager.workflow.core.TransactionHelper; import org.apache.inlong.manager.workflow.definition.Element; import org.apache.inlong.manager.workflow.definition.NextableElement; import org.apache.inlong.manager.workflow.definition.SkippableElement; -import org.apache.inlong.manager.workflow.definition.WorkflowTask; import org.apache.inlong.manager.workflow.processor.ElementProcessor; import org.apache.inlong.manager.workflow.processor.EndEventProcessor; import org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor; @@ -38,8 +34,6 @@ import org.apache.inlong.manager.workflow.processor.StartEventProcessor; import org.apache.inlong.manager.workflow.processor.UserTaskProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.support.TransactionCallback; import javax.annotation.PostConstruct; import java.util.List; @@ -53,8 +47,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor { private ImmutableMap<Class<? extends Element>, ElementProcessor<? extends Element>> elementProcessor; - @Autowired - private TransactionHelper transactionHelper; @Autowired private StartEventProcessor startEventProcessor; @Autowired @@ -101,15 +93,6 @@ public class ProcessorExecutorImpl implements ProcessorExecutor { return; } - // If it is a continuous task execution transaction isolation - if (element instanceof WorkflowTask) { - TransactionCallback<Object> callback = executeCompleteInTransaction(element, context); - if (callback != null) { - transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED); - } - return; - } - executeComplete(element, context); } @@ -159,15 +142,4 @@ public class ProcessorExecutorImpl implements ProcessorExecutor { } } - private TransactionCallback<Object> executeCompleteInTransaction(Element element, WorkflowContext context) { - try { - executeComplete(element, context); - return null; - } catch (WorkflowNoRollbackException e) { // Exception does not roll back - throw e; - } catch (Exception e) { // The exception is only rolled back once - throw new WorkflowRollbackOnceException(e.getMessage()); - } - } - } diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java index c779ba04d..73e58bd2b 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.workflow.processor; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.enums.TaskStatus; import org.apache.inlong.manager.common.exceptions.JsonException; @@ -49,6 +50,7 @@ import java.util.Set; */ @Service @NoArgsConstructor +@Slf4j public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of( @@ -77,27 +79,29 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { public void create(ServiceTask serviceTask, WorkflowContext context) { WorkflowTaskEntity workflowTaskEntity = saveTaskEntity(serviceTask, context); context.getNewTaskList().add(workflowTaskEntity); - serviceTask.initListeners(context); - this.taskEventNotifier.notify(TaskEvent.CREATE, context); + try { + serviceTask.initListeners(context); + this.taskEventNotifier.notify(TaskEvent.CREATE, context); + } catch (Exception e) { + log.error("Create service task failed", e); + ActionContext actionContext = new WorkflowContext.ActionContext() + .setTask((WorkflowTask) context.getCurrentElement()) + .setRemark("failed when create"); + completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED); + this.taskEventNotifier.notify(TaskEvent.FAIL, context); + this.processEventNotifier.notify(ProcessEvent.FAIL, context); + } } @Override public boolean pendingForAction(WorkflowContext context) { - context.setActionContext( - new WorkflowContext.ActionContext() - .setTask((WorkflowTask) context.getCurrentElement()) - .setAction(WorkflowAction.COMPLETE) - .setTaskEntity(context.getNewTaskList().get(0)) - ); - context.getNewTaskList().clear(); return false; } @Override public boolean complete(WorkflowContext context) { + resetActionContext(context); WorkflowContext.ActionContext actionContext = context.getActionContext(); - Preconditions.checkTrue(SUPPORT_ACTIONS.contains(actionContext.getAction()), - "serviceTask not support action: " + actionContext.getAction()); WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity(); Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())), "task status should allow complete"); @@ -107,6 +111,7 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.COMPLETED); return true; } catch (Exception e) { + log.error("Complete service task failed", e); completeTaskEntity(actionContext, workflowTaskEntity, TaskStatus.FAILED); this.taskEventNotifier.notify(TaskEvent.FAIL, context); this.processEventNotifier.notify(ProcessEvent.FAIL, context); @@ -114,6 +119,16 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { } } + private void resetActionContext(WorkflowContext context) { + context.setActionContext( + new WorkflowContext.ActionContext() + .setTask((WorkflowTask) context.getCurrentElement()) + .setAction(WorkflowAction.COMPLETE) + .setTaskEntity(context.getNewTaskList().get(0)) + ); + context.getNewTaskList().clear(); + } + private WorkflowTaskEntity saveTaskEntity(ServiceTask serviceTask, WorkflowContext context) { WorkflowProcessEntity workflowProcessEntity = context.getProcessEntity(); List<String> approvers = ApproverAssign.DEFAULT_SYSTEM_APPROVER.assign(context); @@ -139,7 +154,9 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> { taskEntity.setOperator(taskEntity.getApprovers()); taskEntity.setRemark(actionContext.getRemark()); try { - taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm())); + if (actionContext.getForm() != null) { + taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm())); + } } catch (Exception e) { throw new JsonException("write form to json error: ", e); }
