This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b8bec8221 [INLONG-4212][Manager] The processor executor maybe throws a 
null pointer exception (#4213)
b8bec8221 is described below

commit b8bec82219f444e62095c16875981f498457cb96
Author: healchow <[email protected]>
AuthorDate: Mon May 16 14:24:47 2022 +0800

    [INLONG-4212][Manager] The processor executor maybe throws a null pointer 
exception (#4213)
---
 .../workflow/core/impl/ProcessorExecutorImpl.java  | 32 ++++++++++++----------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index f4e1aab1b..694044946 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -103,8 +103,10 @@ public class ProcessorExecutorImpl implements 
ProcessorExecutor {
 
         // If it is a continuous task execution transaction isolation
         if (element instanceof WorkflowTask) {
-            transactionHelper.execute(executeCompleteInTransaction(element, 
context),
-                    TransactionDefinition.PROPAGATION_NESTED);
+            TransactionCallback<Object> callback = 
executeCompleteInTransaction(element, context);
+            if (callback != null) {
+                transactionHelper.execute(callback, 
TransactionDefinition.PROPAGATION_NESTED);
+            }
             return;
         }
 
@@ -120,7 +122,9 @@ public class ProcessorExecutorImpl implements 
ProcessorExecutor {
             return;
         }
         List<Element> nextElements = processor.next(element, context);
-        nextElements.forEach(next -> executeStart(next, context));
+        for (Element next : nextElements) {
+            executeStart(next, context);
+        }
     }
 
     private boolean isSkipCurrentElement(Element element, WorkflowContext 
context) {
@@ -150,20 +154,20 @@ public class ProcessorExecutorImpl implements 
ProcessorExecutor {
         // Execute next
         context.getActionContext().setAction(((NextableElement) 
element).defaultNextAction());
         List<Element> nextElements = processor.next(element, context);
-        nextElements.forEach(next -> executeStart(next, context));
+        for (Element next : nextElements) {
+            executeStart(next, context);
+        }
     }
 
     private TransactionCallback<Object> executeCompleteInTransaction(Element 
element, WorkflowContext context) {
-        return s -> {
-            try {
-                executeComplete(element, context);
-                return null;
-            } catch (WorkflowNoRollbackException e) { // Exception does not 
roll back
-                throw e;
-            } catch (Exception e) { // The exception is only rolled back once
-                throw new WorkflowRollbackOnceException(e.getMessage());
-            }
-        };
+        try {
+            executeComplete(element, context);
+            return null;
+        } catch (WorkflowNoRollbackException e) { // Exception does not roll 
back
+            throw e;
+        } catch (Exception e) { // The exception is only rolled back once
+            throw new WorkflowRollbackOnceException(e.getMessage());
+        }
     }
 
 }

Reply via email to