This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 729bb446a [INLONG-4300][Manager] Autowired the workflow resources to
simply the bean management (#4302)
729bb446a is described below
commit 729bb446a8351f792a324ec3b877097c985af469
Author: healzhou <[email protected]>
AuthorDate: Mon May 23 15:51:54 2022 +0800
[INLONG-4300][Manager] Autowired the workflow resources to simply the bean
management (#4302)
* [INLONG-4300][Manager] Autowired the workflow resources to simply the
bean management
* [INLONG-4300][Manager] Replace JsonUtils with global ObjectMapper
---
.../client/api/inner/InnerInlongManagerClient.java | 15 +-
.../client/api/util/InlongGroupTransfer.java | 17 +-
.../inlong/manager/common/auth/Authentication.java | 22 +--
.../manager/common/auth/DefaultAuthentication.java | 3 +-
.../manager/common/auth/SecretAuthentication.java | 3 +-
.../common/auth/SecretTokenAuthentication.java | 3 +-
.../manager/common/auth/TokenAuthentication.java | 3 +-
.../inlong/manager/common/util/JsonUtils.java | 173 ---------------------
.../plugin/listener/DeleteSortListener.java | 6 +-
.../plugin/listener/RestartSortListener.java | 12 +-
.../plugin/listener/StartupSortListener.java | 12 +-
.../plugin/listener/SuspendSortListener.java | 6 +-
.../manager/service/CommonOperateService.java | 28 ++--
.../core/operationlog/OperationLogRecorder.java | 19 ++-
.../service/sort/PushSortConfigListener.java | 14 +-
.../workflow/WorkflowDefinitionRegister.java | 14 +-
.../service/workflow/WorkflowEngineConfig.java | 81 ----------
.../service/workflow/WorkflowServiceImpl.java | 33 ++--
.../NewConsumptionProcessDetailHandler.java | 16 +-
.../manager/service/sort/DisableZkForSortTest.java | 5 +-
.../source/listener/DataSourceListenerTest.java | 8 +-
.../service/workflow/WorkflowServiceImplTest.java | 14 +-
.../manager/workflow/core/TransactionHelper.java | 10 +-
.../manager/workflow/core/WorkflowEngine.java | 53 -------
.../core/impl/EventListenerServiceImpl.java | 44 +++---
.../core/impl/ProcessDefinitionServiceImpl.java | 10 +-
.../workflow/core/impl/ProcessServiceImpl.java | 23 ++-
.../workflow/core/impl/ProcessorExecutorImpl.java | 48 +++---
.../workflow/core/impl/TaskServiceImpl.java | 17 +-
.../core/impl/WorkflowContextBuilderImpl.java | 77 +++++----
.../workflow/core/impl/WorkflowEngineImpl.java | 115 --------------
.../workflow/core/impl/WorkflowEventNotifier.java | 48 ------
.../core/impl/WorkflowQueryServiceImpl.java | 7 +-
.../manager/workflow/definition/ServiceTask.java | 14 +-
.../event/EventListenerManagerFactory.java | 61 --------
.../workflow/event/LogableEventListener.java | 7 +-
.../event/process/ProcessEventListenerManager.java | 4 +
.../event/process/ProcessEventNotifier.java | 14 +-
.../event/task/TaskEventListenerManager.java | 2 +
.../workflow/event/task/TaskEventNotifier.java | 9 +-
.../AbstractNextableElementProcessor.java | 3 +-
.../workflow/processor/AbstractTaskProcessor.java | 6 +-
.../workflow/processor/EndEventProcessor.java | 21 ++-
.../workflow/processor/ServiceTaskProcessor.java | 35 +++--
.../workflow/processor/StartEventProcessor.java | 29 ++--
.../workflow/processor/UserTaskProcessor.java | 73 +++++----
.../manager/workflow/util/WorkflowBeanUtils.java | 38 ++++-
.../workflow/util/WorkflowFormParserUtils.java | 24 +--
48 files changed, 439 insertions(+), 860 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index ec66e9258..915d58d08 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -56,7 +56,6 @@ import
org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.List;
@@ -66,6 +65,7 @@ import java.util.List;
@Slf4j
public class InnerInlongManagerClient {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected static final String HTTP_PATH = "api/inlong/manager";
protected final OkHttpClient httpClient;
@@ -178,7 +178,7 @@ public class InnerInlongManagerClient {
pageNum = 1;
}
- ObjectNode groupQuery = JsonUtils.OBJECT_MAPPER.createObjectNode();
+ ObjectNode groupQuery = OBJECT_MAPPER.createObjectNode();
groupQuery.put("keyword", keyword);
groupQuery.put("status", status);
groupQuery.put("pageNum", pageNum);
@@ -234,9 +234,9 @@ public class InnerInlongManagerClient {
assert response.body() != null;
String body = response.body().string();
assertHttpSuccess(response, body, path);
- return JsonUtils.parse(body,
- new
TypeReference<Response<PageInfo<InlongGroupListResponse>>>() {
- });
+
+ return OBJECT_MAPPER.readValue(body, new
TypeReference<Response<PageInfo<InlongGroupListResponse>>>() {
+ });
}
}
@@ -818,12 +818,11 @@ public class InnerInlongManagerClient {
public WorkflowResult startInlongGroup(int taskId,
Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>>
initMsg) {
- ObjectMapper objectMapper = JsonUtils.OBJECT_MAPPER;
- ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
+ ObjectNode workflowTaskOperation = OBJECT_MAPPER.createObjectNode();
workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
workflowTaskOperation.put("remark", "approved by system");
- ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
+ ObjectNode inlongGroupApproveForm = OBJECT_MAPPER.createObjectNode();
inlongGroupApproveForm.putPOJO("groupApproveInfo", initMsg.getKey());
inlongGroupApproveForm.putPOJO("streamApproveInfoList",
initMsg.getValue());
inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 8ee84685c..f96ed3acd 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -40,7 +41,6 @@ import
org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.common.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.ArrayList;
import java.util.List;
@@ -51,6 +51,8 @@ import java.util.Map;
*/
public class InlongGroupTransfer {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
/**
* Parse MQ config from inlong group info.
*/
@@ -256,7 +258,11 @@ public class InlongGroupTransfer {
if (MapUtils.isNotEmpty(flinkSortConf.getProperties())) {
InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
-
flinkProperties.setKeyValue(JsonUtils.toJson(flinkSortConf.getProperties()));
+ try {
+
flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(flinkSortConf.getProperties()));
+ } catch (Exception e) {
+ throw new RuntimeException("get json for sort properties
error: " + e.getMessage());
+ }
extInfos.add(flinkProperties);
}
return extInfos;
@@ -278,7 +284,12 @@ public class InlongGroupTransfer {
if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
-
flinkProperties.setKeyValue(JsonUtils.toJson(userDefinedSortConf.getProperties()));
+ flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ try {
+
flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(userDefinedSortConf.getProperties()));
+ } catch (Exception e) {
+ throw new RuntimeException("get json for sort properties
error: " + e.getMessage());
+ }
extInfos.add(flinkProperties);
}
return extInfos;
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
index 72c709baa..9691d44c1 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
@@ -17,22 +17,25 @@
package org.apache.inlong.manager.common.auth;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.util.Locale;
import java.util.Map;
public interface Authentication {
+ ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ AuthType getAuthType();
+
+ void configure(Map<String, String> properties);
+
enum AuthType {
UNAME_PASSWD,
TOKEN,
SECRET,
SECRET_AND_TOKEN;
- @Override
- public String toString() {
- return this.name().toLowerCase(Locale.ROOT);
- }
-
public static AuthType forType(String type) {
for (AuthType authType : values()) {
if (authType.name().equals(type.toUpperCase())) {
@@ -41,9 +44,10 @@ public interface Authentication {
}
throw new IllegalArgumentException(String.format("Unsupported
authType=%s for Inlong", type));
}
- }
- AuthType getAuthType();
-
- void configure(Map<String, String> properties);
+ @Override
+ public String toString() {
+ return this.name().toLowerCase(Locale.ROOT);
+ }
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
index 9534f3b9b..b2a146dbe 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.Map;
@@ -60,7 +59,7 @@ public class DefaultAuthentication implements Authentication {
@Override
public String toString() {
- ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+ ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put(USER_NAME, this.getUserName());
objectNode.put(PASSWORD, this.getPassword());
return objectNode.toString();
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
index 29ca21cde..1c37d05e3 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.Map;
@@ -60,7 +59,7 @@ public class SecretAuthentication implements Authentication {
@Override
public String toString() {
- ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+ ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put(SECRET_ID, this.getSecretId());
objectNode.put(SECRET_KEY, this.getSecretKey());
return objectNode.toString();
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
index 88e50686c..5908629c1 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.Map;
@@ -56,7 +55,7 @@ public class SecretTokenAuthentication extends
SecretAuthentication {
@SneakyThrows
@Override
public String toString() {
- ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+ ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put(SECRET_ID, this.getSecretId());
objectNode.put(SECRET_KEY, this.getSecretKey());
objectNode.put(SECRET_TOKEN, this.getSToken());
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
index 88defdecf..7857bf235 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.Map;
@@ -53,7 +52,7 @@ public class TokenAuthentication implements Authentication {
@Override
public String toString() {
- ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+ ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put(TOKEN, this.getToken());
return objectNode.toString();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
deleted file mode 100644
index af36f6c34..000000000
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.util;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.exceptions.JsonException;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * JSON utils
- */
-@Slf4j
-@UtilityClass
-public class JsonUtils {
-
- public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- static {
- OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
- OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
-
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- }
-
- /**
- * The instance need to transform to string
- *
- * @param obj instance need to transform
- * @return JSON string after transform
- */
- public static String toJson(Object obj) {
- if (obj == null) {
- return null;
- }
- if (obj.getClass() == String.class) {
- return (String) obj;
- }
- try {
- return OBJECT_MAPPER.writeValueAsString(obj);
- } catch (JsonProcessingException e) {
- log.error("JSON transform error: {}", obj, e);
- throw new JsonException("JSON transform error");
- }
- }
-
- /**
- * Transform the string to specify type
- *
- * @param json JSON string
- * @param type specify type
- * @param <T> type
- * @return instance of type T
- */
- public static <T> T parse(String json, TypeReference<T> type) {
- try {
- return OBJECT_MAPPER.readValue(json, type);
- } catch (IOException e) {
- log.error("JSON transform error: {}", json, e);
- throw new JsonException("JSON transform error");
- }
- }
-
- /**
- * Transform the string to JsonNode
- *
- * @param json JSON string
- * @return JsonNode instance after transform
- */
- public static JsonNode parse(String json) {
- if (StringUtils.isBlank(json)) {
- return null;
- }
- try {
- return OBJECT_MAPPER.readTree(json);
- } catch (IOException e) {
- log.error("JSON transform error: {}", json, e);
- throw new JsonException("JSON transform error");
- }
- }
-
- /**
- * Transform JSON string to Java instance
- *
- * @param json JSON string
- * @param tClass Java instance type
- * @param <T> type
- * @return java instance after transform
- */
- public static <T> T parse(String json, Class<T> tClass) {
- try {
- return OBJECT_MAPPER.readValue(json, tClass);
- } catch (IOException e) {
- log.error("JSON transform error: {}", json, e);
- throw new JsonException("JSON transform error");
- }
- }
-
- public static <T> T parse(String json, JavaType javaType) {
- try {
- return OBJECT_MAPPER.readValue(json, javaType);
- } catch (IOException e) {
- log.error("JSON transform error: {}", json, e);
- throw new JsonException("JSON transform error");
- }
- }
-
- /**
- * Transform JSON string to List
- *
- * @param json JSON string
- * @param eClass element class
- * @param <E> element type
- * @return list after transform
- */
- public static <E> List<E> parseList(String json, Class<E> eClass) {
- try {
- return OBJECT_MAPPER.readValue(json,
-
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, eClass));
- } catch (IOException e) {
- log.error("JSON transform error: {}", json, e);
- throw new JsonException("JSON transform error");
- }
- }
-
- /**
- * Transform JSON string to Map
- *
- * @param json JSON string
- * @param kClass key type in Map
- * @param vClass value type in Map
- * @param <K> key type
- * @param <V> value type
- * @return map after transform
- */
- public static <K, V> Map<K, V> parseMap(String json, Class<K> kClass,
Class<V> vClass) {
- try {
- return OBJECT_MAPPER.readValue(json,
- OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class,
kClass, vClass));
- } catch (IOException e) {
- log.error("JSON transform error: {}", json, e);
- throw new JsonException("JSON transform error");
- }
- }
-
-}
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index e16256823..ccdb162ef 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -25,7 +26,6 @@ import
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -46,6 +46,8 @@ import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class DeleteSortListener implements SortOperateListener {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -76,7 +78,7 @@ public class DeleteSortListener implements
SortOperateListener {
return ListenerResult.fail(message);
}
- Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ Map<String, String> result =
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 511fd2f0e..485d1bca4 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -26,11 +27,10 @@ import
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +50,8 @@ import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class RestartSortListener implements SortOperateListener {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -80,7 +82,7 @@ public class RestartSortListener implements
SortOperateListener {
return ListenerResult.fail(message);
}
- Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ Map<String, String> result =
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
@@ -97,8 +99,8 @@ public class RestartSortListener implements
SortOperateListener {
}
// TODO Support more than one dataflow in one sort job
- Map<String, JsonNode> dataflowMap =
JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new
TypeReference<Map<String, JsonNode>>() {
+ Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
+ OBJECT_MAPPER.readTree(dataFlows), new
TypeReference<Map<String, JsonNode>>() {
});
Optional<JsonNode> dataflowOptional =
dataflowMap.values().stream().findFirst();
JsonNode dataFlow = null;
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index ede84db6f..150780967 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -26,11 +27,10 @@ import
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +50,8 @@ import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class StartupSortListener implements SortOperateListener {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -76,7 +78,7 @@ public class StartupSortListener implements
SortOperateListener {
InlongGroupExtInfo::getKeyValue));
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ Map<String, String> result =
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
@@ -88,8 +90,8 @@ public class StartupSortListener implements
SortOperateListener {
log.error(message);
return ListenerResult.fail(message);
}
- Map<String, JsonNode> dataflowMap =
JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new
TypeReference<Map<String, JsonNode>>() {
+ Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
+ OBJECT_MAPPER.readTree(dataFlows), new
TypeReference<Map<String, JsonNode>>() {
});
Optional<JsonNode> dataflowOptional =
dataflowMap.values().stream().findFirst();
JsonNode dataFlow = null;
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index f4f9d44c6..16ebfd291 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -25,7 +26,6 @@ import
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -46,6 +46,8 @@ import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
@Slf4j
public class SuspendSortListener implements SortOperateListener {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
@@ -76,7 +78,7 @@ public class SuspendSortListener implements
SortOperateListener {
return ListenerResult.fail(message);
}
- Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+ Map<String, String> result =
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index 626ac9fa6..157952c85 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -26,7 +27,6 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
@@ -50,6 +50,8 @@ public class CommonOperateService {
private static final Logger LOGGER =
LoggerFactory.getLogger(CommonOperateService.class);
+ @Autowired
+ public ObjectMapper objectMapper;
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
@@ -137,14 +139,22 @@ public class CommonOperateService {
if (clusterEntity == null ||
StringUtils.isBlank(clusterEntity.getExtParams())) {
throw new BusinessException("pulsar cluster or pulsar ext params
is empty");
}
- Map<String, String> configParams =
JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
- PulsarClusterInfo pulsarClusterInfo =
PulsarClusterInfo.builder().brokerServiceUrl(
-
clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
- String adminUrl =
configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
- Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third
party cluster table");
- pulsarClusterInfo.setAdminUrl(adminUrl);
- pulsarClusterInfo.setType(clusterEntity.getType());
- return pulsarClusterInfo;
+
+ PulsarClusterInfo pulsarCluster = PulsarClusterInfo.builder()
+ .brokerServiceUrl(clusterEntity.getUrl())
+ .token(clusterEntity.getToken())
+ .build();
+ try {
+ Map<String, String> configParams =
objectMapper.readValue(clusterEntity.getExtParams(), Map.class);
+ String adminUrl =
configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
+ pulsarCluster.setAdminUrl(adminUrl);
+ } catch (Exception e) {
+ LOGGER.error("parse pulsar cluster info error: ", e);
+ }
+
+ Preconditions.checkNotNull(pulsarCluster.getAdminUrl(), "adminUrl is
empty, check third party cluster table");
+ pulsarCluster.setType(clusterEntity.getType());
+ return pulsarCluster;
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
index 597afdc25..82a20ef72 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
@@ -17,14 +17,12 @@
package org.apache.inlong.manager.service.core.operationlog;
-import java.util.Date;
-import java.util.Optional;
-import javax.servlet.http.HttpServletRequest;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.pojo.user.UserDetail;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.LoginUserUtils;
import org.apache.inlong.manager.common.util.NetworkUtils;
import org.apache.inlong.manager.dao.entity.OperationLogEntity;
@@ -33,6 +31,10 @@ import
org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
+import javax.servlet.http.HttpServletRequest;
+import java.util.Date;
+import java.util.Optional;
+
/**
* Operation of log aspect
*/
@@ -40,6 +42,7 @@ import
org.springframework.web.context.request.ServletRequestAttributes;
public class OperationLogRecorder {
private static final String ANONYMOUS_USER = "AnonymousUser";
+ private static final Gson GSON = new GsonBuilder().create(); // thread safe
/**
* Save operation logs of all Controller
@@ -60,8 +63,8 @@ public class OperationLogRecorder {
String requestUrl = request.getRequestURI();
String httpMethod = request.getMethod();
String remoteAddress = NetworkUtils.getClientIpAddress(request);
- String param = JsonUtils.toJson(request.getParameterMap());
- String body = JsonUtils.toJson(joinPoint.getArgs());
+ String param = GSON.toJson(request.getParameterMap());
+ String body = GSON.toJson(joinPoint.getArgs());
OperationType operationType = operationLog.operation();
@@ -92,9 +95,9 @@ public class OperationLogRecorder {
if (operationLog.db()) {
OperationLogPool.publish(operationLogEntity);
} else if (success) {
- log.info("operation log: {}",
JsonUtils.toJson(operationLogEntity));
+ log.info("operation log: {}", GSON.toJson(operationLogEntity));
} else {
- log.error("request handle failed : {}",
JsonUtils.toJson(operationLogEntity));
+ log.error("request handle failed : {}",
GSON.toJson(operationLogEntity));
}
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
index 626cb4d4a..492c44f2f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
@@ -17,13 +17,13 @@
package org.apache.inlong.manager.service.sort;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.DataFlowUtils;
@@ -57,6 +57,8 @@ public class PushSortConfigListener implements
SortOperateListener {
private StreamSinkService streamSinkService;
@Autowired
private DataFlowUtils dataFlowUtils;
+ @Autowired
+ private ObjectMapper objectMapper;
@Override
public TaskEvent event() {
@@ -86,13 +88,9 @@ public class PushSortConfigListener implements
SortOperateListener {
LOGGER.debug("sink info: {}", sinkResponse);
}
- DataFlowInfo dataFlowInfo =
dataFlowUtils.createDataFlow(groupInfo, sinkResponse);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("try to push config to sort: {}",
JsonUtils.toJson(dataFlowInfo));
- }
-
Integer sinkId = sinkResponse.getId();
try {
+ DataFlowInfo dataFlowInfo =
dataFlowUtils.createDataFlow(groupInfo, sinkResponse);
String zkUrl = clusterBean.getZkUrl();
String zkRoot = clusterBean.getZkRoot();
// push data flow info to zk
@@ -100,6 +98,10 @@ public class PushSortConfigListener implements
SortOperateListener {
ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName,
sinkId, zkUrl, zkRoot);
// add sink id to zk
ZkTools.addDataFlowToCluster(sortClusterName, sinkId, zkUrl,
zkRoot);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("success to push config to sort: {}",
objectMapper.writeValueAsString(dataFlowInfo));
+ }
} catch (Exception e) {
LOGGER.error("push sort config to zookeeper failed, sinkId={}
", sinkId, e);
throw new WorkflowListenerException("push sort config to
zookeeper failed: " + e.getMessage());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
index 7f963f1aa..2933e0113 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
@@ -18,32 +18,32 @@
package org.apache.inlong.manager.service.workflow;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
+import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
-@Service
@Slf4j
+@Service
public class WorkflowDefinitionRegister {
@Autowired
- private WorkflowEngine workflowEngine;
+ private ProcessDefinitionService processDefService;
@Autowired
private List<WorkflowDefinition> workflowDefinitions;
@PostConstruct
- public void registerDefinition() {
- workflowDefinitions.forEach(definition -> {
+ public void registerWorkflowDefinition() {
+ for (WorkflowDefinition definition : workflowDefinitions) {
try {
-
workflowEngine.processDefinitionService().register(definition.defineProcess());
+ processDefService.register(definition.defineProcess());
log.info("success register workflow definition: {}",
definition.getProcessName());
} catch (Exception e) {
log.error("failed to register workflow definition {}",
definition.getProcessName(), e);
}
- });
+ }
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowEngineConfig.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowEngineConfig.java
deleted file mode 100644
index bc122e500..000000000
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowEngineConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
-import org.apache.inlong.manager.workflow.WorkflowConfig;
-import org.apache.inlong.manager.workflow.core.EventListenerService;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
-import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
-import
org.apache.inlong.manager.workflow.core.impl.MemoryProcessDefinitionRepository;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEngineImpl;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
-
-/**
- * Workflow engine config
- */
-@Component
-@Configuration
-@Slf4j
-public class WorkflowEngineConfig {
-
- @Autowired
- private WorkflowQueryService queryService;
- @Autowired
- private MemoryProcessDefinitionRepository memoryProcessRepository;
- @Autowired
- private WorkflowProcessEntityMapper processEntityMapper;
- @Autowired
- private WorkflowTaskEntityMapper taskEntityMapper;
- @Autowired
- private WorkflowEventLogEntityMapper eventLogMapper;
- @Autowired
- private PlatformTransactionManager platformTransactionManager;
-
- @Bean
- public WorkflowEngine workflowEngine() {
- WorkflowConfig workflowConfig = new WorkflowConfig()
- .setQueryService(queryService)
- .setProcessEntityMapper(processEntityMapper)
- .setTaskEntityMapper(taskEntityMapper)
- .setEventLogMapper(eventLogMapper)
- .setDefinitionRepository(memoryProcessRepository)
- .setTransactionManager(platformTransactionManager);
-
- return new WorkflowEngineImpl(workflowConfig);
- }
-
- @Bean
- public EventListenerService eventListenerService(WorkflowEngine
workflowEngine) {
- return workflowEngine.eventListenerService();
- }
-
- @Bean
- public ProcessDefinitionService processDefinitionService(WorkflowEngine
workflowEngine) {
- return workflowEngine.processDefinitionService();
- }
-
-}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index b10cb13c2..9c1cdc180 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.workflow;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@@ -43,7 +44,9 @@ import
org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
import
org.apache.inlong.manager.service.workflow.WorkflowExecuteLog.ListenerExecutorLog;
import
org.apache.inlong.manager.service.workflow.WorkflowExecuteLog.TaskExecutorLog;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
+import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
+import org.apache.inlong.manager.workflow.core.ProcessService;
+import org.apache.inlong.manager.workflow.core.TaskService;
import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
import org.apache.inlong.manager.workflow.definition.UserTask;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -70,51 +73,56 @@ public class WorkflowServiceImpl implements WorkflowService
{
private static final Logger LOGGER =
LoggerFactory.getLogger(WorkflowServiceImpl.class);
- @Autowired
- private WorkflowEngine workflowEngine;
-
@Autowired
private WorkflowQueryService queryService;
+ @Autowired
+ private ProcessDefinitionService processDefService;
+ @Autowired
+ private ProcessService processService;
+ @Autowired
+ private TaskService taskService;
+ @Autowired
+ private ObjectMapper objectMapper;
@Override
@Transactional(noRollbackFor = WorkflowNoRollbackException.class,
rollbackFor = Exception.class)
public WorkflowResult start(ProcessName process, String applicant,
ProcessForm form) {
- WorkflowContext context =
workflowEngine.processService().start(process.name(), applicant, form);
+ WorkflowContext context = processService.start(process.name(),
applicant, form);
return WorkflowBeanUtils.result(context);
}
@Override
@Transactional(noRollbackFor = WorkflowNoRollbackException.class,
rollbackFor = Exception.class)
public WorkflowResult cancel(Integer processId, String operator, String
remark) {
- WorkflowContext context =
workflowEngine.processService().cancel(processId, operator, remark);
+ WorkflowContext context = processService.cancel(processId, operator,
remark);
return WorkflowBeanUtils.result(context);
}
@Override
@Transactional(noRollbackFor = WorkflowNoRollbackException.class,
rollbackFor = Exception.class)
public WorkflowResult approve(Integer taskId, String remark, TaskForm
form, String operator) {
- WorkflowContext context = workflowEngine.taskService().approve(taskId,
remark, form, operator);
+ WorkflowContext context = taskService.approve(taskId, remark, form,
operator);
return WorkflowBeanUtils.result(context);
}
@Override
@Transactional(noRollbackFor = WorkflowNoRollbackException.class,
rollbackFor = Exception.class)
public WorkflowResult reject(Integer taskId, String remark, String
operator) {
- WorkflowContext context = workflowEngine.taskService().reject(taskId,
remark, operator);
+ WorkflowContext context = taskService.reject(taskId, remark, operator);
return WorkflowBeanUtils.result(context);
}
@Override
@Transactional(noRollbackFor = WorkflowNoRollbackException.class,
rollbackFor = Exception.class)
public WorkflowResult transfer(Integer taskId, String remark, List<String>
to, String operator) {
- WorkflowContext context =
workflowEngine.taskService().transfer(taskId, remark, to, operator);
+ WorkflowContext context = taskService.transfer(taskId, remark, to,
operator);
return WorkflowBeanUtils.result(context);
}
@Override
@Transactional(noRollbackFor = WorkflowNoRollbackException.class,
rollbackFor = Exception.class)
public WorkflowResult complete(Integer taskId, String remark, String
operator) {
- WorkflowContext context =
workflowEngine.taskService().complete(taskId, remark, operator);
+ WorkflowContext context = taskService.complete(taskId, remark,
operator);
return WorkflowBeanUtils.result(context);
}
@@ -252,13 +260,14 @@ public class WorkflowServiceImpl implements
WorkflowService {
}
private Map<String, Object> getShowInList(WorkflowProcessEntity
processEntity) {
- WorkflowProcess process =
workflowEngine.processDefinitionService().getByName(processEntity.getName());
+ WorkflowProcess process =
processDefService.getByName(processEntity.getName());
if (process == null || process.getFormClass() == null) {
return null;
}
try {
- ProcessForm processForm =
WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(), process);
+ ProcessForm processForm =
WorkflowFormParserUtils.parseProcessForm(objectMapper,
+ processEntity.getFormData(), process);
assert processForm != null;
return processForm.showInList();
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
index 1996b0589..d9fe25a12 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.workflow.consumption;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.pojo.workflow.ProcessDetailResponse;
import
org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
@@ -32,21 +33,22 @@ import org.springframework.stereotype.Component;
@Component
public class NewConsumptionProcessDetailHandler implements
ProcessDetailHandler {
+ @Autowired
+ private ObjectMapper objectMapper;
@Autowired
private ProcessDefinitionService processDefinitionService;
@Override
- public ProcessDetailResponse handle(ProcessDetailResponse
workflowResponse) {
- WorkflowProcess process =
processDefinitionService.getByName(workflowResponse.getWorkflow().getName());
-
+ public ProcessDetailResponse handle(ProcessDetailResponse processResponse)
{
+ WorkflowProcess process =
processDefinitionService.getByName(processResponse.getWorkflow().getName());
NewConsumptionProcessForm processForm = WorkflowFormParserUtils
-
.parseProcessForm(workflowResponse.getProcessInfo().getFormData().toString(),
process);
+ .parseProcessForm(objectMapper,
processResponse.getProcessInfo().getFormData().toString(), process);
if (processForm == null) {
- return workflowResponse;
+ return processResponse;
}
- workflowResponse.getProcessInfo().setFormData(processForm);
- return workflowResponse;
+ processResponse.getProcessInfo().setFormData(processForm);
+ return processResponse;
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index cd1a27a52..115267879 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -130,7 +130,7 @@ public class DisableZkForSortTest extends
WorkflowServiceImplTest {
createHiveSink(streamInfo);
createKafkaSource(streamInfo);
mockTaskListenerFactory();
- WorkflowContext context =
workflowEngine.processService().start(processName.name(), applicant, form);
+ WorkflowContext context = processService.start(processName.name(),
applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -161,8 +161,7 @@ public class DisableZkForSortTest extends
WorkflowServiceImplTest {
form.setGroupOperateType(GroupOperateType.SUSPEND);
taskListenerFactory.acceptPlugin(new MockPlugin());
- WorkflowContext context = workflowEngine.processService()
- .start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant,
form);
+ WorkflowContext context =
processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 684d4b5ff..8529d88ac 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -64,7 +64,7 @@ public class DataSourceListenerTest extends
WorkflowServiceImplTest {
sourceRequest.setSourceName("binlog-collect");
return streamSourceService.save(sourceRequest, OPERATOR);
}
-
+
/**
* There will be concurrency problems in the overall operation,This method
temporarily fails the test
*/
@@ -81,8 +81,7 @@ public class DataSourceListenerTest extends
WorkflowServiceImplTest {
form = new GroupResourceProcessForm();
form.setGroupInfo(groupInfo);
form.setGroupOperateType(GroupOperateType.SUSPEND);
- WorkflowContext context = workflowEngine.processService()
- .start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant,
form);
+ WorkflowContext context =
processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -112,8 +111,7 @@ public class DataSourceListenerTest extends
WorkflowServiceImplTest {
form = new GroupResourceProcessForm();
form.setGroupInfo(groupInfo);
form.setGroupOperateType(GroupOperateType.RESTART);
- WorkflowContext context = workflowEngine.processService()
- .start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant,
form);
+ WorkflowContext context =
processService.start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 21b946359..6bda5b50e 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -50,7 +50,7 @@ import
org.apache.inlong.manager.service.resource.SinkResourceListener;
import org.apache.inlong.manager.service.sort.PushSortConfigListener;
import
org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
+import org.apache.inlong.manager.workflow.core.ProcessService;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -95,7 +95,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Autowired
protected WorkflowServiceImpl workflowService;
@Autowired
- protected WorkflowEngine workflowEngine;
+ protected ProcessService processService;
@Autowired
protected InlongGroupService groupService;
@Autowired
@@ -251,7 +251,7 @@ public class WorkflowServiceImplTest extends
ServiceBaseTest {
public void testStartCreatePulsarWorkflow() {
initGroupForm(MQType.PULSAR.getType(), "test14" + subType);
mockTaskListenerFactory();
- WorkflowContext context =
workflowEngine.processService().start(processName.name(), applicant, form);
+ WorkflowContext context = processService.start(processName.name(),
applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse view = result.getProcessInfo();
// This method temporarily fails the test, so comment it out first
@@ -271,7 +271,7 @@ public class WorkflowServiceImplTest extends
ServiceBaseTest {
public void testStartCreateTubeWorkflow() {
initGroupForm(MQType.TUBE.getType(), "test10" + subType);
mockTaskListenerFactory();
- WorkflowContext context =
workflowEngine.processService().start(processName.name(), applicant, form);
+ WorkflowContext context = processService.start(processName.name(),
applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -297,7 +297,7 @@ public class WorkflowServiceImplTest extends
ServiceBaseTest {
form.setGroupOperateType(GroupOperateType.SUSPEND);
taskListenerFactory.acceptPlugin(new MockPlugin());
- WorkflowContext context = workflowEngine.processService()
+ WorkflowContext context = processService
.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant,
form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
@@ -332,7 +332,7 @@ public class WorkflowServiceImplTest extends
ServiceBaseTest {
form.setGroupOperateType(GroupOperateType.RESTART);
taskListenerFactory.acceptPlugin(new MockPlugin());
- WorkflowContext context = workflowEngine.processService()
+ WorkflowContext context = processService
.start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant,
form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
@@ -368,7 +368,7 @@ public class WorkflowServiceImplTest extends
ServiceBaseTest {
form.setGroupOperateType(GroupOperateType.DELETE);
taskListenerFactory.acceptPlugin(new MockPlugin());
- WorkflowContext context = workflowEngine.processService()
+ WorkflowContext context = processService
.start(ProcessName.DELETE_GROUP_PROCESS.name(), applicant,
form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse view = result.getProcessInfo();
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 12938d63d..704d2bc0b 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -20,6 +20,8 @@ package org.apache.inlong.manager.workflow.core;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
import
org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
@@ -34,13 +36,11 @@ import java.lang.reflect.UndeclaredThrowableException;
* Transaction Helper
*/
@Slf4j
+@Service
public class TransactionHelper {
- private final PlatformTransactionManager transactionManager;
-
- public TransactionHelper(PlatformTransactionManager transactionManager) {
- this.transactionManager = transactionManager;
- }
+ @Autowired
+ private PlatformTransactionManager transactionManager;
/**
* Execute in transaction
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/WorkflowEngine.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/WorkflowEngine.java
deleted file mode 100644
index 12aed2a97..000000000
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/WorkflowEngine.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.core;
-
-/**
- * Workflow engine
- */
-public interface WorkflowEngine {
-
- /**
- * Get process definition service
- *
- * @return WorkflowProcess definition service
- */
- ProcessDefinitionService processDefinitionService();
-
- /**
- * Get process instance service
- *
- * @return Process instance service
- */
- ProcessService processService();
-
- /**
- * Obtain task processing services
- *
- * @return task services
- */
- TaskService taskService();
-
- /**
- * Get event listener service
- *
- * @return event listener service
- */
- EventListenerService eventListenerService();
-
-}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
index 27d33da9f..9762bc40c 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
@@ -28,29 +28,35 @@ import
org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
import org.apache.inlong.manager.workflow.definition.Element;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
-import org.apache.inlong.manager.workflow.event.EventListenerManagerFactory;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
+import
org.apache.inlong.manager.workflow.event.process.ProcessEventListenerManager;
+import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEventListenerManager;
+import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* Event listener service
*/
+@Service
public class EventListenerServiceImpl implements EventListenerService {
- private final WorkflowContextBuilder workflowContextBuilder;
- private final WorkflowEventNotifier workflowEventNotifier;
- private final EventListenerManagerFactory listenerManagerFactory;
- private final WorkflowEventLogEntityMapper eventLogMapper;
-
- public EventListenerServiceImpl(WorkflowContextBuilder contextBuilder,
WorkflowEventNotifier eventNotifier,
- EventListenerManagerFactory listenerManagerFactory,
WorkflowEventLogEntityMapper eventLogMapper) {
- this.workflowContextBuilder = contextBuilder;
- this.workflowEventNotifier = eventNotifier;
- this.listenerManagerFactory = listenerManagerFactory;
- this.eventLogMapper = eventLogMapper;
- }
+ @Autowired
+ private WorkflowContextBuilder workflowContextBuilder;
+ @Autowired
+ private ProcessEventNotifier processEventNotifier;
+ @Autowired
+ private TaskEventNotifier taskEventNotifier;
+ @Autowired
+ private ProcessEventListenerManager processListenerManager;
+ @Autowired
+ private TaskEventListenerManager taskListenerManager;
+ @Autowired
+ private WorkflowEventLogEntityMapper eventLogMapper;
@Override
public void executeEventListener(Integer eventLogId) {
@@ -75,7 +81,7 @@ public class EventListenerServiceImpl implements
EventListenerService {
ProcessEvent processEvent =
getProcessEventListener(context.getProcess(), listener).event();
context.setCurrentElement(getCurrentElement(context.getProcess(),
processEvent));
- workflowEventNotifier.getProcessEventNotifier().notify(listener, true,
context);
+ processEventNotifier.notify(listener, true, context);
}
@Override
@@ -83,21 +89,21 @@ public class EventListenerServiceImpl implements
EventListenerService {
WorkflowContext context =
workflowContextBuilder.buildContextForTask(taskId, null);
TaskEventListener eventListener = getTaskEventListener((WorkflowTask)
context.getCurrentElement(), listener);
context.getActionContext().setAction(WorkflowAction.fromTaskEvent(eventListener.event()));
- workflowEventNotifier.getTaskEventNotifier().notify(listener, true,
context);
+ taskEventNotifier.notify(listener, true, context);
}
@Override
public void triggerProcessEvent(Integer processId, ProcessEvent
processEvent) {
WorkflowContext context =
workflowContextBuilder.buildContextForProcess(processId);
context.setCurrentElement(getCurrentElement(context.getProcess(),
processEvent));
- workflowEventNotifier.getProcessEventNotifier().notify(processEvent,
context);
+ processEventNotifier.notify(processEvent, context);
}
@Override
public void triggerTaskEvent(Integer taskId, TaskEvent taskEvent) {
WorkflowContext context = workflowContextBuilder
.buildContextForTask(taskId,
WorkflowAction.fromTaskEvent(taskEvent));
- workflowEventNotifier.getTaskEventNotifier().notify(taskEvent,
context);
+ taskEventNotifier.notify(taskEvent, context);
}
private Element getCurrentElement(WorkflowProcess process, ProcessEvent
processEvent) {
@@ -113,7 +119,7 @@ public class EventListenerServiceImpl implements
EventListenerService {
return listener;
}
- listener =
listenerManagerFactory.getProcessListenerManager().listener(listenerName);
+ listener = processListenerManager.listener(listenerName);
Preconditions.checkNotNull(listener, "process listener not exist with
name: " + listenerName);
return listener;
}
@@ -124,7 +130,7 @@ public class EventListenerServiceImpl implements
EventListenerService {
return listener;
}
- listener =
listenerManagerFactory.getTaskListenerManager().listener(listenerName);
+ listener = taskListenerManager.listener(listenerName);
Preconditions.checkNotNull(listener, "task listener not exist with
name: " + listenerName);
return listener;
}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
index 632d5910c..38bb8a68a 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
@@ -21,17 +21,17 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* WorkflowProcess definition service
*/
+@Service
public class ProcessDefinitionServiceImpl implements ProcessDefinitionService {
- private final ProcessDefinitionRepository definitionRepository;
-
- public ProcessDefinitionServiceImpl(ProcessDefinitionRepository
definitionRepository) {
- this.definitionRepository = definitionRepository;
- }
+ @Autowired
+ private ProcessDefinitionRepository definitionRepository;
@Override
public void register(WorkflowProcess process) {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
index 303ab3151..1c67caf1e 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.workflow.core.impl;
import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
@@ -26,28 +27,24 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.ProcessService;
import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.List;
/**
* WorkflowProcess service
*/
+@Service
public class ProcessServiceImpl implements ProcessService {
- private final ProcessorExecutor processorExecutor;
- private final WorkflowContextBuilder workflowContextBuilder;
- private final WorkflowTaskEntityMapper taskEntityMapper;
-
- public ProcessServiceImpl(
- ProcessorExecutor processorExecutor,
- WorkflowContextBuilder workflowContextBuilder,
- WorkflowTaskEntityMapper taskEntityMapper) {
- this.processorExecutor = processorExecutor;
- this.workflowContextBuilder = workflowContextBuilder;
- this.taskEntityMapper = taskEntityMapper;
- }
+ @Autowired
+ private ProcessorExecutor processorExecutor;
+ @Autowired
+ private WorkflowContextBuilder workflowContextBuilder;
+ @Autowired
+ private WorkflowTaskEntityMapper taskEntityMapper;
@Override
public WorkflowContext start(String name, String applicant, ProcessForm
form) {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index 694044946..bf9c055f3 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -23,8 +23,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
import
org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
import org.apache.inlong.manager.workflow.core.TransactionHelper;
@@ -38,44 +36,46 @@ import
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor;
import org.apache.inlong.manager.workflow.processor.SkipableElementProcessor;
import org.apache.inlong.manager.workflow.processor.StartEventProcessor;
import org.apache.inlong.manager.workflow.processor.UserTaskProcessor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionCallback;
+import javax.annotation.PostConstruct;
import java.util.List;
/**
* Workload component processor execution
*/
@Slf4j
+@Service
public class ProcessorExecutorImpl implements ProcessorExecutor {
- private final ImmutableMap<Class<? extends Element>, ElementProcessor<?
extends Element>> elementProcessor;
-
- private final TransactionHelper transactionHelper;
-
- public ProcessorExecutorImpl(
- WorkflowProcessEntityMapper processEntityMapper,
- WorkflowTaskEntityMapper taskEntityMapper,
- WorkflowEventNotifier workflowEventNotifier,
- TransactionHelper transactionHelper) {
+ private ImmutableMap<Class<? extends Element>, ElementProcessor<? extends
Element>> elementProcessor;
+
+ @Autowired
+ private TransactionHelper transactionHelper;
+ @Autowired
+ private StartEventProcessor startEventProcessor;
+ @Autowired
+ private EndEventProcessor endEventProcessor;
+ @Autowired
+ private UserTaskProcessor userTaskProcessor;
+ @Autowired
+ private ServiceTaskProcessor serviceTaskProcessor;
+
+ @PostConstruct
+ private void initProcessors() {
+ List<ElementProcessor<?>> processors = Lists.newArrayList();
+ processors.add(startEventProcessor);
+ processors.add(endEventProcessor);
+ processors.add(userTaskProcessor);
+ processors.add(serviceTaskProcessor);
- List<ElementProcessor<? extends Element>> processors =
initProcessors(processEntityMapper,
- taskEntityMapper, workflowEventNotifier);
ImmutableMap.Builder<Class<? extends Element>, ElementProcessor<?
extends Element>> builder
= ImmutableMap.builder();
processors.forEach(processor -> builder.put(processor.watch(),
processor));
elementProcessor = builder.build();
- this.transactionHelper = transactionHelper;
- }
-
- private List<ElementProcessor<?>>
initProcessors(WorkflowProcessEntityMapper processEntityMapper,
- WorkflowTaskEntityMapper taskEntityMapper, WorkflowEventNotifier
eventNotifier) {
- List<ElementProcessor<?>> processors = Lists.newArrayList();
- processors.add(new StartEventProcessor(processEntityMapper,
eventNotifier));
- processors.add(new EndEventProcessor(processEntityMapper,
taskEntityMapper, eventNotifier));
- processors.add(new UserTaskProcessor(taskEntityMapper, eventNotifier));
- processors.add(new ServiceTaskProcessor(taskEntityMapper,
eventNotifier));
- return processors;
}
private ElementProcessor<? extends Element> getProcessor(Class<? extends
Element> elementClazz) {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
index 9f2c8c116..397c2a9ae 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
@@ -17,28 +17,27 @@
package org.apache.inlong.manager.workflow.core.impl;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
import org.apache.inlong.manager.workflow.core.TaskService;
import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.List;
/**
* WorkflowTask service
*/
+@Service
public class TaskServiceImpl implements TaskService {
- private final ProcessorExecutor processorExecutor;
- private final WorkflowContextBuilder contextBuilder;
-
- public TaskServiceImpl(ProcessorExecutor processorExecutor,
- WorkflowContextBuilder contextBuilder) {
- this.processorExecutor = processorExecutor;
- this.contextBuilder = contextBuilder;
- }
+ @Autowired
+ private ProcessorExecutor processorExecutor;
+ @Autowired
+ private WorkflowContextBuilder contextBuilder;
@Override
public WorkflowContext approve(Integer taskId, String remark, TaskForm
form, String operator) {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
index 4f5caad25..e62b9ec98 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
@@ -17,10 +17,15 @@
package org.apache.inlong.manager.workflow.core.impl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
@@ -30,11 +35,11 @@ import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
import org.apache.inlong.manager.workflow.util.WorkflowFormParserUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@@ -42,19 +47,18 @@ import java.util.Map;
/**
* Workflow context builder
*/
+@Slf4j
+@Service
public class WorkflowContextBuilderImpl implements WorkflowContextBuilder {
- private final ProcessDefinitionRepository definitionRepository;
- private final WorkflowProcessEntityMapper processEntityMapper;
- private final WorkflowTaskEntityMapper taskEntityMapper;
-
- public WorkflowContextBuilderImpl(ProcessDefinitionRepository
definitionRepository,
- WorkflowProcessEntityMapper processEntityMapper,
- WorkflowTaskEntityMapper taskEntityMapper) {
- this.definitionRepository = definitionRepository;
- this.processEntityMapper = processEntityMapper;
- this.taskEntityMapper = taskEntityMapper;
- }
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private ProcessDefinitionRepository definitionRepository;
+ @Autowired
+ private WorkflowProcessEntityMapper processEntityMapper;
+ @Autowired
+ private WorkflowTaskEntityMapper taskEntityMapper;
@SneakyThrows
@Override
@@ -78,7 +82,8 @@ public class WorkflowContextBuilderImpl implements
WorkflowContextBuilder {
return new WorkflowContext()
.setOperator(processEntity.getApplicant())
.setProcess(process)
-
.setProcessForm(WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(),
process))
+ .setProcessForm(
+ WorkflowFormParserUtils.parseProcessForm(objectMapper,
processEntity.getFormData(), process))
.setProcessEntity(processEntity);
}
@@ -104,7 +109,7 @@ public class WorkflowContextBuilderImpl implements
WorkflowContextBuilder {
public WorkflowContext buildContextForTask(Integer taskId, WorkflowAction
action) {
WorkflowTaskEntity taskEntity = taskEntityMapper.selectById(taskId);
WorkflowProcess process =
definitionRepository.get(taskEntity.getProcessName()).clone();
- TaskForm taskForm = WorkflowFormParserUtils.parseTaskForm(taskEntity,
process);
+ TaskForm taskForm =
WorkflowFormParserUtils.parseTaskForm(objectMapper, taskEntity, process);
List<String> transferToUsers =
getTransferToUsers(taskEntity.getExtParams());
return buildContextForTask(taskId, action, taskForm, transferToUsers,
taskEntity.getRemark(),
taskEntity.getOperator());
@@ -118,7 +123,8 @@ public class WorkflowContextBuilderImpl implements
WorkflowContextBuilder {
WorkflowProcessEntity processEntity =
processEntityMapper.selectById(taskEntity.getProcessId());
WorkflowProcess process =
definitionRepository.get(processEntity.getName()).clone();
- ProcessForm processForm =
WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(), process);
+ ProcessForm processForm =
WorkflowFormParserUtils.parseProcessForm(objectMapper,
processEntity.getFormData(),
+ process);
WorkflowTask task = process.getTaskByName(taskEntity.getName());
return new WorkflowContext().setProcess(process)
@@ -126,29 +132,34 @@ public class WorkflowContextBuilderImpl implements
WorkflowContextBuilder {
.setProcessForm(processForm)
.setProcessEntity(processEntity)
.setCurrentElement(task)
- .setActionContext(
- new WorkflowContext.ActionContext()
- .setAction(action)
- .setTaskEntity(taskEntity)
- .setTask(task)
- .setForm(taskForm)
- .setTransferToUsers(transferToUsers)
- .setOperator(operator)
- .setRemark(remark)
+ .setActionContext(new WorkflowContext.ActionContext()
+ .setAction(action)
+ .setTaskEntity(taskEntity)
+ .setTask(task)
+ .setForm(taskForm)
+ .setTransferToUsers(transferToUsers)
+ .setOperator(operator)
+ .setRemark(remark)
);
}
+ @SuppressWarnings("unchecked")
private List<String> getTransferToUsers(String ext) {
if (StringUtils.isEmpty(ext)) {
return Lists.newArrayList();
}
- Map<String, Object> extMap = JsonUtils.parseMap(ext, String.class,
Object.class);
- if (!extMap.containsKey(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY)) {
- return Lists.newArrayList();
- }
-
- if (extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY) instanceof
List) {
- return (List<String>)
extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY);
+ try {
+ Map<String, Object> extMap = objectMapper.readValue(ext,
+ objectMapper.getTypeFactory().constructMapType(Map.class,
String.class, Object.class));
+ if (!extMap.containsKey(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY))
{
+ return Lists.newArrayList();
+ }
+ if (extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY)
instanceof List) {
+ return (List<String>)
extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY);
+ }
+ } catch (JsonProcessingException e) {
+ log.error("parse transfer users error: ", e);
+ throw new JsonException("parse transfer users error");
}
return null;
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEngineImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEngineImpl.java
deleted file mode 100644
index e4f6f3869..000000000
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEngineImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.core.impl;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
-import org.apache.inlong.manager.workflow.WorkflowConfig;
-import org.apache.inlong.manager.workflow.core.EventListenerService;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
-import org.apache.inlong.manager.workflow.core.ProcessService;
-import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
-import org.apache.inlong.manager.workflow.core.TaskService;
-import org.apache.inlong.manager.workflow.core.TransactionHelper;
-import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
-import org.apache.inlong.manager.workflow.event.EventListenerManagerFactory;
-
-/**
- * Workflow engine
- */
-@Slf4j
-public class WorkflowEngineImpl implements WorkflowEngine {
-
- private final ProcessDefinitionService processDefService;
-
- private final ProcessService processService;
-
- private final TaskService taskService;
-
- private final EventListenerService eventListenerService;
-
- /**
- * Construct WorkflowConfig instance
- */
- public WorkflowEngineImpl(WorkflowConfig workflowConfig) {
- log.info("start init workflow engine with config: {}",
JsonUtils.toJson(workflowConfig));
-
- // Database transaction assistant
- TransactionHelper transactionHelper = new
TransactionHelper(workflowConfig.getTransactionManager());
-
- // Workflow event listener manager
- EventListenerManagerFactory listenerManagerFactory = new
EventListenerManagerFactory(workflowConfig);
-
- // Workflow event notifier
- WorkflowEventNotifier workflowEventNotifier = new
WorkflowEventNotifier(listenerManagerFactory);
-
- // Workflow component executor
- WorkflowProcessEntityMapper processEntityMapper =
workflowConfig.getProcessEntityMapper();
- WorkflowTaskEntityMapper taskEntityMapper =
workflowConfig.getTaskEntityMapper();
- ProcessorExecutor processorExecutor = new
ProcessorExecutorImpl(processEntityMapper,
- taskEntityMapper, workflowEventNotifier, transactionHelper);
-
- ProcessDefinitionRepository processDefRepository =
workflowConfig.getDefinitionRepository();
-
- // Workflow context builder
- WorkflowContextBuilder contextBuilder = new WorkflowContextBuilderImpl(
- processDefRepository, processEntityMapper, taskEntityMapper);
-
- // Workflow process definition service
- this.processDefService = new
ProcessDefinitionServiceImpl(processDefRepository);
-
- // Workflow process instance service
- this.processService = new ProcessServiceImpl(processorExecutor,
contextBuilder, taskEntityMapper);
-
- // Workflow task service
- this.taskService = new TaskServiceImpl(processorExecutor,
contextBuilder);
-
- // Workflow event listener service
- WorkflowEventLogEntityMapper eventLogMapper =
workflowConfig.getEventLogMapper();
- this.eventListenerService = new
EventListenerServiceImpl(contextBuilder, workflowEventNotifier,
- listenerManagerFactory, eventLogMapper);
-
- log.info("success init workflow engine");
- }
-
- @Override
- public ProcessDefinitionService processDefinitionService() {
- return processDefService;
- }
-
- @Override
- public ProcessService processService() {
- return processService;
- }
-
- @Override
- public TaskService taskService() {
- return taskService;
- }
-
- @Override
- public EventListenerService eventListenerService() {
- return eventListenerService;
- }
-
-}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEventNotifier.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEventNotifier.java
deleted file mode 100644
index 9fd52f0de..000000000
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEventNotifier.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.core.impl;
-
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.workflow.event.EventListenerManagerFactory;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
-import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
-
-/**
- * Workflow event notifier
- */
-public class WorkflowEventNotifier {
-
- private final ProcessEventNotifier processEventNotifier;
-
- private final TaskEventNotifier taskEventNotifier;
-
- public WorkflowEventNotifier(EventListenerManagerFactory factory) {
- WorkflowEventLogEntityMapper eventLogMapper =
factory.getEventLogMapper();
- taskEventNotifier = new
TaskEventNotifier(factory.getTaskListenerManager(), eventLogMapper);
- processEventNotifier = new
ProcessEventNotifier(factory.getProcessListenerManager(), eventLogMapper);
- }
-
- public ProcessEventNotifier getProcessEventNotifier() {
- return processEventNotifier;
- }
-
- public TaskEventNotifier getTaskEventNotifier() {
- return taskEventNotifier;
- }
-
-}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
index 60be006c5..55b6c16f6 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.workflow.core.impl;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ProcessStatus;
@@ -35,6 +36,7 @@ import
org.apache.inlong.manager.common.pojo.workflow.TaskQuery;
import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverQuery;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowBriefDTO;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.dao.entity.WorkflowApproverEntity;
import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -49,7 +51,6 @@ import
org.apache.inlong.manager.workflow.core.WorkflowQueryService;
import org.apache.inlong.manager.workflow.definition.Element;
import org.apache.inlong.manager.workflow.definition.NextableElement;
import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.workflow.definition.UserTask;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -81,6 +82,8 @@ public class WorkflowQueryServiceImpl implements
WorkflowQueryService {
private WorkflowEventLogEntityMapper eventLogMapper;
@Autowired
private WorkflowApproverEntityMapper approverMapper;
+ @Autowired
+ private ObjectMapper objectMapper;
@Override
public WorkflowProcessEntity getProcessEntity(Integer processId) {
@@ -240,7 +243,7 @@ public class WorkflowQueryServiceImpl implements
WorkflowQueryService {
elementDTO.setName(startEvent.getName());
elementDTO.setDisplayName(startEvent.getDisplayName());
- WorkflowContext context = WorkflowBeanUtils.buildContext(process,
processEntity);
+ WorkflowContext context = WorkflowBeanUtils.buildContext(objectMapper,
process, processEntity);
addNext(startEvent, elementDTO, context, nameStatusMap);
WorkflowBriefDTO briefDTO = new WorkflowBriefDTO();
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index c66a8a7e3..91c1060bf 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -20,13 +20,15 @@ package org.apache.inlong.manager.workflow.definition;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.EventListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.springframework.util.CollectionUtils;
@@ -43,6 +45,8 @@ import java.util.stream.Collectors;
@Slf4j
public class ServiceTask extends WorkflowTask {
+ public static final Gson GSON = new GsonBuilder().create();
+
private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
.of(WorkflowAction.COMPLETE, WorkflowAction.CANCEL,
WorkflowAction.TERMINATE);
@@ -101,7 +105,7 @@ public class ServiceTask extends WorkflowTask {
try {
return (ConditionNextElement) ele.clone();
} catch (CloneNotSupportedException e) {
- e.printStackTrace();
+ log.error("clone service task error: ", e);
}
return null;
}).collect(Collectors.toList())));
@@ -124,9 +128,9 @@ public class ServiceTask extends WorkflowTask {
}
List<TaskEventListener> listeners = Lists.newArrayList(
listenerProvider.get(workflowContext, serviceTaskType));
- log.info("ServiceTask:{} is init for listeners:{}", getName(),
- JsonUtils.toJson(listeners.stream().map(listener ->
listener.name()).collect(
- Collectors.toList())));
+
+ List<String> listenerNames =
listeners.stream().map(EventListener::name).collect(Collectors.toList());
+ log.info("ServiceTask:{} is init for listeners:{}", getName(),
GSON.toJson(listenerNames));
addListeners(listeners);
} else {
log.info("ServiceTask:{} is already init", getName());
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventListenerManagerFactory.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventListenerManagerFactory.java
deleted file mode 100644
index 84b69d402..000000000
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventListenerManagerFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.event;
-
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.workflow.WorkflowConfig;
-import
org.apache.inlong.manager.workflow.event.process.ProcessEventListenerManager;
-import org.apache.inlong.manager.workflow.event.task.TaskEventListenerManager;
-
-/**
- * Factory of workflow event listener manager
- */
-public class EventListenerManagerFactory {
-
- private final ProcessEventListenerManager processListenerManager;
- private final TaskEventListenerManager taskListenerManager;
- private final WorkflowEventLogEntityMapper eventLogMapper;
-
- public EventListenerManagerFactory(WorkflowConfig workflowConfig) {
- this.processListenerManager = new ProcessEventListenerManager();
- this.eventLogMapper = workflowConfig.getEventLogMapper();
- this.taskListenerManager = new
TaskEventListenerManager(eventLogMapper);
- }
-
- /**
- * Get process event listener manager.
- */
- public ProcessEventListenerManager getProcessListenerManager() {
- return processListenerManager;
- }
-
- /**
- * Get task event listener manager.
- */
- public TaskEventListenerManager getTaskListenerManager() {
- return taskListenerManager;
- }
-
- /**
- * Get workflow event listener manager.
- */
- public WorkflowEventLogEntityMapper getEventLogMapper() {
- return eventLogMapper;
- }
-
-}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
index cf42e04f4..105543049 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.workflow.event;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EventStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.NetworkUtils;
import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -63,10 +62,10 @@ public abstract class LogableEventListener<EventType
extends WorkflowEvent> impl
WorkflowEventLogEntity workflowEventLogEntity = buildEventLog(context);
try {
ListenerResult result = eventListener.listen(context);
- log.debug("listener execute result:{} - {}",
workflowEventLogEntity, result);
+ log.debug("listener execute result: {} - {}",
workflowEventLogEntity, result);
return result;
} catch (Exception e) {
- log.error("listener exception:{}", workflowEventLogEntity, e);
+ log.error("execute listener {} error: {}", workflowEventLogEntity,
e);
if (!async()) {
throw new WorkflowListenerException(e);
}
@@ -87,7 +86,7 @@ public abstract class LogableEventListener<EventType extends
WorkflowEvent> impl
} catch (Exception e) {
logEntity.setStatus(EventStatus.FAILED.getStatus());
logEntity.setException(e.getMessage());
- log.error("listener exception:{}", JsonUtils.toJson(logEntity), e);
+ log.error("execute listener {} error: {}", logEntity, e);
if (!async()) {
throw new WorkflowListenerException(e.getMessage());
}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
index ae519911b..ec567362e 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
@@ -22,6 +22,8 @@ import com.google.common.collect.Maps;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
import org.apache.inlong.manager.workflow.event.EventListenerManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@@ -29,6 +31,7 @@ import java.util.Map;
/**
* System default process event listener manager
*/
+@Service
public class ProcessEventListenerManager implements
EventListenerManager<ProcessEvent, ProcessEventListener> {
private static final List<ProcessEventListener> EMPTY =
Lists.newArrayList();
@@ -36,6 +39,7 @@ public class ProcessEventListenerManager implements
EventListenerManager<Process
private final Map<ProcessEvent, List<ProcessEventListener>>
asyncProcessEventListeners = Maps.newHashMap();
private final Map<String, ProcessEventListener> processEventListeners =
Maps.newHashMap();
+ @Autowired
private WorkflowEventLogEntityMapper eventLogMapper;
@Override
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
index e4050c8b1..ac9cd10ed 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
@@ -25,6 +25,8 @@ import
org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.event.EventListenerManager;
import org.apache.inlong.manager.workflow.event.EventListenerNotifier;
import org.apache.inlong.manager.workflow.event.LogableEventListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -38,6 +40,7 @@ import java.util.function.Consumer;
* WorkflowProcess event notifier
*/
@Slf4j
+@Service
public class ProcessEventNotifier implements
EventListenerNotifier<ProcessEvent> {
private final ExecutorService executorService = new ThreadPoolExecutor(
@@ -49,13 +52,10 @@ public class ProcessEventNotifier implements
EventListenerNotifier<ProcessEvent>
new
ThreadFactoryBuilder().setNameFormat("async-process-event-notifier-%s").build(),
new CallerRunsPolicy());
- private final EventListenerManager<ProcessEvent, ProcessEventListener>
eventListenerManager;
- private final WorkflowEventLogEntityMapper eventLogMapper;
-
- public ProcessEventNotifier(ProcessEventListenerManager manager,
WorkflowEventLogEntityMapper eventLogMapper) {
- this.eventListenerManager = manager;
- this.eventLogMapper = eventLogMapper;
- }
+ @Autowired
+ private EventListenerManager<ProcessEvent, ProcessEventListener>
eventListenerManager;
+ @Autowired
+ private WorkflowEventLogEntityMapper eventLogMapper;
@Override
public void notify(ProcessEvent event, WorkflowContext context) {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
index 8408861fe..0373cc350 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Maps;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
import org.apache.inlong.manager.workflow.event.EventListenerManager;
+import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.Map;
/**
* Internal default task listener management
*/
+@Service
public class TaskEventListenerManager implements
EventListenerManager<TaskEvent, TaskEventListener> {
private final Map<TaskEvent, List<TaskEventListener>>
syncTaskEventListeners = Maps.newHashMap();
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
index 6a3144f2c..3382203c4 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
@@ -25,6 +25,8 @@ import
org.apache.inlong.manager.workflow.definition.WorkflowTask;
import org.apache.inlong.manager.workflow.event.EventListenerManager;
import org.apache.inlong.manager.workflow.event.EventListenerNotifier;
import org.apache.inlong.manager.workflow.event.LogableEventListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -38,6 +40,7 @@ import java.util.function.Consumer;
* WorkflowProcess event notifier
*/
@Slf4j
+@Service
public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
private final ExecutorService executorService = new ThreadPoolExecutor(
@@ -49,8 +52,10 @@ public class TaskEventNotifier implements
EventListenerNotifier<TaskEvent> {
new
ThreadFactoryBuilder().setNameFormat("async-task-event-notifier-%s").build(),
new CallerRunsPolicy());
- private final EventListenerManager<TaskEvent, TaskEventListener>
eventListenerManager;
- private final WorkflowEventLogEntityMapper eventLogMapper;
+ @Autowired
+ private EventListenerManager<TaskEvent, TaskEventListener>
eventListenerManager;
+ @Autowired
+ private WorkflowEventLogEntityMapper eventLogMapper;
public TaskEventNotifier(TaskEventListenerManager listenerManager,
WorkflowEventLogEntityMapper eventLogMapper) {
this.eventListenerManager = listenerManager;
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
index c79a1fe32..ebfd091fc 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
@@ -32,8 +32,7 @@ import java.util.stream.Collectors;
/**
* Non-terminal element processor
*/
-public abstract class AbstractNextableElementProcessor<T extends
NextableElement> implements
- ElementProcessor<T> {
+public abstract class AbstractNextableElementProcessor<T extends
NextableElement> implements ElementProcessor<T> {
@Override
public List<Element> next(T element, WorkflowContext context) {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
index 64229889d..77051a026 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.ApproverAssign;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
@@ -35,12 +36,9 @@ import java.util.Date;
public abstract class AbstractTaskProcessor<T extends WorkflowTask> extends
AbstractNextableElementProcessor<T> implements
SkipableElementProcessor<T> {
+ @Autowired
protected WorkflowTaskEntityMapper taskEntityMapper;
- protected AbstractTaskProcessor(WorkflowTaskEntityMapper taskEntityMapper)
{
- this.taskEntityMapper = taskEntityMapper;
- }
-
@Override
public void skip(T task, WorkflowContext context) {
WorkflowProcessEntity workflowProcessEntity =
context.getProcessEntity();
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
index 60c5605cf..9e22967a2 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
@@ -27,11 +27,12 @@ import
org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
import org.apache.inlong.manager.workflow.definition.Element;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
@@ -42,19 +43,15 @@ import java.util.List;
* End event handler
*/
@Slf4j
+@Service
public class EndEventProcessor implements ElementProcessor<EndEvent> {
- private final WorkflowProcessEntityMapper processEntityMapper;
- private final WorkflowTaskEntityMapper taskEntityMapper;
- private final ProcessEventNotifier processEventNotifier;
-
- public EndEventProcessor(WorkflowProcessEntityMapper processEntityMapper,
- WorkflowTaskEntityMapper taskEntityMapper,
- WorkflowEventNotifier eventNotifier) {
- this.processEntityMapper = processEntityMapper;
- this.taskEntityMapper = taskEntityMapper;
- this.processEventNotifier = eventNotifier.getProcessEventNotifier();
- }
+ @Autowired
+ private ProcessEventNotifier processEventNotifier;
+ @Autowired
+ private WorkflowTaskEntityMapper taskEntityMapper;
+ @Autowired
+ private WorkflowProcessEntityMapper processEntityMapper;
@Override
public Class<EndEvent> watch() {
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index fe8a02d7d..c779ba04d 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -17,17 +17,19 @@
package org.apache.inlong.manager.workflow.processor;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
+import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.TaskStatus;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.exceptions.JsonException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
+import org.apache.inlong.manager.workflow.WorkflowContext.ActionContext;
import org.apache.inlong.manager.workflow.definition.ApproverAssign;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -35,6 +37,8 @@ import
org.apache.inlong.manager.workflow.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@@ -43,6 +47,8 @@ import java.util.Set;
/**
* System task processor
*/
+@Service
+@NoArgsConstructor
public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
@@ -53,14 +59,14 @@ public class ServiceTaskProcessor extends
AbstractTaskProcessor<ServiceTask> {
TaskStatus.PENDING, TaskStatus.FAILED
);
- private final TaskEventNotifier taskEventNotifier;
- private final ProcessEventNotifier processEventNotifier;
-
- public ServiceTaskProcessor(WorkflowTaskEntityMapper taskEntityMapper,
WorkflowEventNotifier eventNotifier) {
- super(taskEntityMapper);
- this.taskEventNotifier = eventNotifier.getTaskEventNotifier();
- this.processEventNotifier = eventNotifier.getProcessEventNotifier();
- }
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private WorkflowTaskEntityMapper taskEntityMapper;
+ @Autowired
+ private TaskEventNotifier taskEventNotifier;
+ @Autowired
+ private ProcessEventNotifier processEventNotifier;
@Override
public Class<ServiceTask> watch() {
@@ -128,12 +134,15 @@ public class ServiceTaskProcessor extends
AbstractTaskProcessor<ServiceTask> {
return taskEntity;
}
- private void completeTaskEntity(WorkflowContext.ActionContext
actionContext, WorkflowTaskEntity taskEntity,
- TaskStatus taskStatus) {
+ private void completeTaskEntity(ActionContext actionContext,
WorkflowTaskEntity taskEntity, TaskStatus taskStatus) {
taskEntity.setStatus(taskStatus.name());
taskEntity.setOperator(taskEntity.getApprovers());
taskEntity.setRemark(actionContext.getRemark());
- taskEntity.setFormData(JsonUtils.toJson(actionContext.getForm()));
+ try {
+
taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+ } catch (Exception e) {
+ throw new JsonException("write form to json error: ", e);
+ }
taskEntity.setEndTime(new Date());
taskEntityMapper.update(taskEntity);
}
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index 20fc2ea18..7619c2c77 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -17,35 +17,36 @@
package org.apache.inlong.manager.workflow.processor;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.workflow.definition.StartEvent;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.Date;
/**
* Start event handler
*/
+@Service
public class StartEventProcessor extends
AbstractNextableElementProcessor<StartEvent> {
- private final WorkflowProcessEntityMapper processEntityMapper;
-
- private final ProcessEventNotifier processEventNotifier;
-
- public StartEventProcessor(WorkflowProcessEntityMapper
processEntityMapper, WorkflowEventNotifier eventNotifier) {
- this.processEntityMapper = processEntityMapper;
- this.processEventNotifier = eventNotifier.getProcessEventNotifier();
- }
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private ProcessEventNotifier processEventNotifier;
+ @Autowired
+ private WorkflowProcessEntityMapper processEntityMapper;
@Override
public Class<StartEvent> watch() {
@@ -90,7 +91,11 @@ public class StartEventProcessor extends
AbstractNextableElementProcessor<StartE
processEntity.setInlongGroupId(form.getInlongGroupId());
processEntity.setApplicant(applicant);
processEntity.setStatus(ProcessStatus.PROCESSING.name());
- processEntity.setFormData(JsonUtils.toJson(form));
+ try {
+ processEntity.setFormData(objectMapper.writeValueAsString(form));
+ } catch (Exception e) {
+ throw new JsonException("write form to json error: ", e);
+ }
processEntity.setStartTime(new Date());
processEntity.setHidden(process.getHidden());
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
index ec64c5611..6fe6ba2b5 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
@@ -17,51 +17,54 @@
package org.apache.inlong.manager.workflow.processor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.exceptions.JsonException;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
import org.apache.inlong.manager.workflow.definition.Element;
import org.apache.inlong.manager.workflow.definition.UserTask;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
/**
* User task processor
*/
+@Slf4j
+@Service
public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
private static final Set<WorkflowAction> SHOULD_CHECK_OPERATOR_ACTIONS =
ImmutableSet
.of(WorkflowAction.APPROVE, WorkflowAction.REJECT,
WorkflowAction.TRANSFER);
-
private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
WorkflowAction.APPROVE, WorkflowAction.REJECT,
WorkflowAction.TRANSFER, WorkflowAction.CANCEL,
WorkflowAction.TERMINATE
);
- private final TaskEventNotifier taskEventNotifier;
- public UserTaskProcessor(WorkflowTaskEntityMapper taskEntityMapper,
WorkflowEventNotifier eventNotifier) {
- super(taskEntityMapper);
- this.taskEventNotifier = eventNotifier.getTaskEventNotifier();
- }
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private TaskEventNotifier taskEventNotifier;
@Override
public Class<UserTask> watch() {
@@ -170,17 +173,33 @@ public class UserTaskProcessor extends
AbstractTaskProcessor<UserTask> {
taskEntity.setRemark(actionContext.getRemark());
UserTask userTask = (UserTask) actionContext.getTask();
- if (needForm(userTask, actionContext.getAction())) {
- Preconditions.checkNotNull(actionContext.getForm(), "form cannot
be null");
-
Preconditions.checkTrue(actionContext.getForm().getClass().isAssignableFrom(userTask.getFormClass()),
- "form type not match, should be class " +
userTask.getFormClass());
- actionContext.getForm().validate();
- taskEntity.setFormData(JsonUtils.toJson(actionContext.getForm()));
- } else {
- Preconditions.checkNull(actionContext.getForm(), "no form
required");
+ try {
+ TaskForm taskForm = actionContext.getForm();
+ if (needForm(userTask, actionContext.getAction())) {
+ Preconditions.checkNotNull(taskForm, "form cannot be null");
+
Preconditions.checkTrue(taskForm.getClass().isAssignableFrom(userTask.getFormClass()),
+ "form type not match, should be class " +
userTask.getFormClass());
+ taskForm.validate();
+
taskEntity.setFormData(objectMapper.writeValueAsString(taskForm));
+ } else {
+ Preconditions.checkNull(taskForm, "no form required");
+ }
+ taskEntity.setEndTime(new Date());
+
+ Map<String, Object> extMap = new HashMap<>();
+ if (StringUtils.isNotBlank(taskEntity.getExtParams())) {
+ extMap = objectMapper.readValue(taskEntity.getExtParams(),
+
objectMapper.getTypeFactory().constructMapType(Map.class, String.class,
Object.class));
+ if (WorkflowAction.TRANSFER.equals(actionContext.getAction()))
{
+ extMap.put(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY,
actionContext.getTransferToUsers());
+ }
+ }
+ String extParams = objectMapper.writeValueAsString(extMap);
+ taskEntity.setExtParams(extParams);
+ } catch (JsonProcessingException e) {
+ log.error("parse transfer users error: ", e);
+ throw new JsonException("parse transfer users error");
}
- taskEntity.setEndTime(new Date());
- taskEntity.setExtParams(handlerExt(actionContext,
taskEntity.getExtParams()));
taskEntityMapper.update(taskEntity);
}
@@ -192,18 +211,6 @@ public class UserTaskProcessor extends
AbstractTaskProcessor<UserTask> {
return WorkflowAction.APPROVE.equals(workflowAction) ||
WorkflowAction.COMPLETE.equals(workflowAction);
}
- private String handlerExt(WorkflowContext.ActionContext actionContext,
String oldExt) {
- Map<String, Object> extMap = Optional.ofNullable(oldExt)
- .map(e -> JsonUtils.parseMap(oldExt, String.class,
Object.class))
- .orElseGet(Maps::newHashMap);
-
- if (WorkflowAction.TRANSFER.equals(actionContext.getAction())) {
- extMap.put(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY,
actionContext.getTransferToUsers());
- }
-
- return JsonUtils.toJson(extMap);
- }
-
private TaskStatus toTaskState(WorkflowAction workflowAction) {
switch (workflowAction) {
case APPROVE:
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
index cebf3f2a5..c89475860 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
@@ -17,16 +17,20 @@
package org.apache.inlong.manager.workflow.util;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.exceptions.JsonException;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,15 +44,17 @@ import java.util.stream.Collectors;
*/
public class WorkflowBeanUtils {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOGGER =
LoggerFactory.getLogger(WorkflowBeanUtils.class);
/**
* Build workflow context from WorkflowProcess and WorkflowProcessEntity
*/
- public static WorkflowContext buildContext(WorkflowProcess process,
WorkflowProcessEntity processEntity) {
+ public static WorkflowContext buildContext(ObjectMapper objectMapper,
WorkflowProcess process,
+ WorkflowProcessEntity processEntity) {
ProcessForm processForm = null;
try {
- processForm =
WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(), process);
+ processForm =
WorkflowFormParserUtils.parseProcessForm(objectMapper,
processEntity.getFormData(), process);
} catch (Exception e) {
LOGGER.error("build context from process form failed with id: {}",
processEntity.getId(), e);
}
@@ -91,7 +97,8 @@ public class WorkflowBeanUtils {
if (entity == null) {
return null;
}
- return ProcessResponse.builder()
+
+ ProcessResponse processResponse = ProcessResponse.builder()
.id(entity.getId())
.name(entity.getName())
.displayName(entity.getDisplayName())
@@ -101,9 +108,26 @@ public class WorkflowBeanUtils {
.status(ProcessStatus.valueOf(entity.getStatus()))
.startTime(entity.getStartTime())
.endTime(entity.getEndTime())
- .formData(JsonUtils.parse(entity.getFormData()))
- .extParams(JsonUtils.parse(entity.getExtParams()))
.build();
+
+ try {
+ JsonNode formData = null;
+ if (StringUtils.isNotBlank(entity.getFormData())) {
+ formData = OBJECT_MAPPER.readTree(entity.getFormData());
+ }
+ processResponse.setFormData(formData);
+
+ JsonNode extParams = null;
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ extParams = OBJECT_MAPPER.readTree(entity.getExtParams());
+ }
+ processResponse.setExtParams(extParams);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("parse form data error: ", e);
+ throw new JsonException("parse form data or ext params error");
+ }
+
+ return processResponse;
}
/**
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
index 49d6b13a4..5853c47c8 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
@@ -18,14 +18,14 @@
package org.apache.inlong.manager.workflow.util;
import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.exceptions.FormParseException;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
import org.apache.inlong.manager.workflow.definition.UserTask;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -39,8 +39,8 @@ public class WorkflowFormParserUtils {
/**
* Parse the task form in JSON string format into a WorkflowTask instance
*/
- public static <T extends TaskForm> T parseTaskForm(WorkflowTaskEntity
workflowTaskEntity, WorkflowProcess process)
- throws FormParseException {
+ public static <T extends TaskForm> T parseTaskForm(ObjectMapper
objectMapper,
+ WorkflowTaskEntity workflowTaskEntity, WorkflowProcess process)
throws FormParseException {
Preconditions.checkNotNull(workflowTaskEntity, "workflowTaskEntity
cannot be null");
Preconditions.checkNotNull(process, "process cannot be null");
@@ -54,10 +54,10 @@ public class WorkflowFormParserUtils {
UserTask userTask = (UserTask) task;
try {
- JavaType javaType =
JsonUtils.OBJECT_MAPPER.constructType(userTask.getFormClass());
- return JsonUtils.parse(workflowTaskEntity.getFormData(), javaType);
+ JavaType javaType =
objectMapper.constructType(userTask.getFormClass());
+ return objectMapper.readValue(workflowTaskEntity.getFormData(),
javaType);
} catch (Exception e) {
- log.error("task form parse failed, form is: {}",
workflowTaskEntity.getFormData(), e);
+ log.error("task parsed failed for form {}",
workflowTaskEntity.getFormData(), e);
throw new FormParseException("task form parse failed");
}
}
@@ -65,8 +65,8 @@ public class WorkflowFormParserUtils {
/**
* Parse the process form in JSON string format into a WorkflowProcess
instance
*/
- public static <T extends ProcessForm> T parseProcessForm(String form,
WorkflowProcess process)
- throws FormParseException {
+ public static <T extends ProcessForm> T parseProcessForm(ObjectMapper
objectMapper, String form,
+ WorkflowProcess process) throws FormParseException {
Preconditions.checkNotNull(process, "process cannot be null");
if (StringUtils.isEmpty(form)) {
@@ -74,8 +74,8 @@ public class WorkflowFormParserUtils {
}
try {
- JavaType javaType =
JsonUtils.OBJECT_MAPPER.constructType(process.getFormClass());
- return JsonUtils.parse(form, javaType);
+ JavaType javaType =
objectMapper.constructType(process.getFormClass());
+ return objectMapper.readValue(form, javaType);
} catch (Exception e) {
log.error("process form parse failed, form is: {}", form, e);
throw new FormParseException("process form parse failed");