This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b8bec8221 [INLONG-4212][Manager] The processor executor maybe throws a
null pointer exception (#4213)
b8bec8221 is described below
commit b8bec82219f444e62095c16875981f498457cb96
Author: healchow <[email protected]>
AuthorDate: Mon May 16 14:24:47 2022 +0800
[INLONG-4212][Manager] The processor executor maybe throws a null pointer
exception (#4213)
---
.../workflow/core/impl/ProcessorExecutorImpl.java | 32 ++++++++++++----------
1 file changed, 18 insertions(+), 14 deletions(-)
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index f4e1aab1b..694044946 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -103,8 +103,10 @@ public class ProcessorExecutorImpl implements
ProcessorExecutor {
// If it is a continuous task execution transaction isolation
if (element instanceof WorkflowTask) {
- transactionHelper.execute(executeCompleteInTransaction(element,
context),
- TransactionDefinition.PROPAGATION_NESTED);
+ TransactionCallback<Object> callback =
executeCompleteInTransaction(element, context);
+ if (callback != null) {
+ transactionHelper.execute(callback,
TransactionDefinition.PROPAGATION_NESTED);
+ }
return;
}
@@ -120,7 +122,9 @@ public class ProcessorExecutorImpl implements
ProcessorExecutor {
return;
}
List<Element> nextElements = processor.next(element, context);
- nextElements.forEach(next -> executeStart(next, context));
+ for (Element next : nextElements) {
+ executeStart(next, context);
+ }
}
private boolean isSkipCurrentElement(Element element, WorkflowContext
context) {
@@ -150,20 +154,20 @@ public class ProcessorExecutorImpl implements
ProcessorExecutor {
// Execute next
context.getActionContext().setAction(((NextableElement)
element).defaultNextAction());
List<Element> nextElements = processor.next(element, context);
- nextElements.forEach(next -> executeStart(next, context));
+ for (Element next : nextElements) {
+ executeStart(next, context);
+ }
}
private TransactionCallback<Object> executeCompleteInTransaction(Element
element, WorkflowContext context) {
- return s -> {
- try {
- executeComplete(element, context);
- return null;
- } catch (WorkflowNoRollbackException e) { // Exception does not
roll back
- throw e;
- } catch (Exception e) { // The exception is only rolled back once
- throw new WorkflowRollbackOnceException(e.getMessage());
- }
- };
+ try {
+ executeComplete(element, context);
+ return null;
+ } catch (WorkflowNoRollbackException e) { // Exception does not roll
back
+ throw e;
+ } catch (Exception e) { // The exception is only rolled back once
+ throw new WorkflowRollbackOnceException(e.getMessage());
+ }
}
}