This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 729bb446a [INLONG-4300][Manager] Autowired the workflow resources to 
simply the bean management (#4302)
729bb446a is described below

commit 729bb446a8351f792a324ec3b877097c985af469
Author: healzhou <[email protected]>
AuthorDate: Mon May 23 15:51:54 2022 +0800

    [INLONG-4300][Manager] Autowired the workflow resources to simply the bean 
management (#4302)
    
    * [INLONG-4300][Manager] Autowired the workflow resources to simply the 
bean management
    
    * [INLONG-4300][Manager] Replace JsonUtils with global ObjectMapper
---
 .../client/api/inner/InnerInlongManagerClient.java |  15 +-
 .../client/api/util/InlongGroupTransfer.java       |  17 +-
 .../inlong/manager/common/auth/Authentication.java |  22 +--
 .../manager/common/auth/DefaultAuthentication.java |   3 +-
 .../manager/common/auth/SecretAuthentication.java  |   3 +-
 .../common/auth/SecretTokenAuthentication.java     |   3 +-
 .../manager/common/auth/TokenAuthentication.java   |   3 +-
 .../inlong/manager/common/util/JsonUtils.java      | 173 ---------------------
 .../plugin/listener/DeleteSortListener.java        |   6 +-
 .../plugin/listener/RestartSortListener.java       |  12 +-
 .../plugin/listener/StartupSortListener.java       |  12 +-
 .../plugin/listener/SuspendSortListener.java       |   6 +-
 .../manager/service/CommonOperateService.java      |  28 ++--
 .../core/operationlog/OperationLogRecorder.java    |  19 ++-
 .../service/sort/PushSortConfigListener.java       |  14 +-
 .../workflow/WorkflowDefinitionRegister.java       |  14 +-
 .../service/workflow/WorkflowEngineConfig.java     |  81 ----------
 .../service/workflow/WorkflowServiceImpl.java      |  33 ++--
 .../NewConsumptionProcessDetailHandler.java        |  16 +-
 .../manager/service/sort/DisableZkForSortTest.java |   5 +-
 .../source/listener/DataSourceListenerTest.java    |   8 +-
 .../service/workflow/WorkflowServiceImplTest.java  |  14 +-
 .../manager/workflow/core/TransactionHelper.java   |  10 +-
 .../manager/workflow/core/WorkflowEngine.java      |  53 -------
 .../core/impl/EventListenerServiceImpl.java        |  44 +++---
 .../core/impl/ProcessDefinitionServiceImpl.java    |  10 +-
 .../workflow/core/impl/ProcessServiceImpl.java     |  23 ++-
 .../workflow/core/impl/ProcessorExecutorImpl.java  |  48 +++---
 .../workflow/core/impl/TaskServiceImpl.java        |  17 +-
 .../core/impl/WorkflowContextBuilderImpl.java      |  77 +++++----
 .../workflow/core/impl/WorkflowEngineImpl.java     | 115 --------------
 .../workflow/core/impl/WorkflowEventNotifier.java  |  48 ------
 .../core/impl/WorkflowQueryServiceImpl.java        |   7 +-
 .../manager/workflow/definition/ServiceTask.java   |  14 +-
 .../event/EventListenerManagerFactory.java         |  61 --------
 .../workflow/event/LogableEventListener.java       |   7 +-
 .../event/process/ProcessEventListenerManager.java |   4 +
 .../event/process/ProcessEventNotifier.java        |  14 +-
 .../event/task/TaskEventListenerManager.java       |   2 +
 .../workflow/event/task/TaskEventNotifier.java     |   9 +-
 .../AbstractNextableElementProcessor.java          |   3 +-
 .../workflow/processor/AbstractTaskProcessor.java  |   6 +-
 .../workflow/processor/EndEventProcessor.java      |  21 ++-
 .../workflow/processor/ServiceTaskProcessor.java   |  35 +++--
 .../workflow/processor/StartEventProcessor.java    |  29 ++--
 .../workflow/processor/UserTaskProcessor.java      |  73 +++++----
 .../manager/workflow/util/WorkflowBeanUtils.java   |  38 ++++-
 .../workflow/util/WorkflowFormParserUtils.java     |  24 +--
 48 files changed, 439 insertions(+), 860 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index ec66e9258..915d58d08 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -56,7 +56,6 @@ import 
org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.List;
 
@@ -66,6 +65,7 @@ import java.util.List;
 @Slf4j
 public class InnerInlongManagerClient {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     protected static final String HTTP_PATH = "api/inlong/manager";
 
     protected final OkHttpClient httpClient;
@@ -178,7 +178,7 @@ public class InnerInlongManagerClient {
             pageNum = 1;
         }
 
-        ObjectNode groupQuery = JsonUtils.OBJECT_MAPPER.createObjectNode();
+        ObjectNode groupQuery = OBJECT_MAPPER.createObjectNode();
         groupQuery.put("keyword", keyword);
         groupQuery.put("status", status);
         groupQuery.put("pageNum", pageNum);
@@ -234,9 +234,9 @@ public class InnerInlongManagerClient {
             assert response.body() != null;
             String body = response.body().string();
             assertHttpSuccess(response, body, path);
-            return JsonUtils.parse(body,
-                    new 
TypeReference<Response<PageInfo<InlongGroupListResponse>>>() {
-                    });
+
+            return OBJECT_MAPPER.readValue(body, new 
TypeReference<Response<PageInfo<InlongGroupListResponse>>>() {
+            });
         }
     }
 
@@ -818,12 +818,11 @@ public class InnerInlongManagerClient {
     public WorkflowResult startInlongGroup(int taskId,
             Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> 
initMsg) {
 
-        ObjectMapper objectMapper = JsonUtils.OBJECT_MAPPER;
-        ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
+        ObjectNode workflowTaskOperation = OBJECT_MAPPER.createObjectNode();
         workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
         workflowTaskOperation.put("remark", "approved by system");
 
-        ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
+        ObjectNode inlongGroupApproveForm = OBJECT_MAPPER.createObjectNode();
         inlongGroupApproveForm.putPOJO("groupApproveInfo", initMsg.getKey());
         inlongGroupApproveForm.putPOJO("streamApproveInfoList", 
initMsg.getValue());
         inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 8ee84685c..f96ed3acd 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api.util;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.reflect.TypeToken;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -40,7 +41,6 @@ import 
org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
 import org.apache.inlong.manager.common.pojo.sort.UserDefinedSortConf;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,6 +51,8 @@ import java.util.Map;
  */
 public class InlongGroupTransfer {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     /**
      * Parse MQ config from inlong group info.
      */
@@ -256,7 +258,11 @@ public class InlongGroupTransfer {
         if (MapUtils.isNotEmpty(flinkSortConf.getProperties())) {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
             flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
-            
flinkProperties.setKeyValue(JsonUtils.toJson(flinkSortConf.getProperties()));
+            try {
+                
flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(flinkSortConf.getProperties()));
+            } catch (Exception e) {
+                throw new RuntimeException("get json for sort properties 
error: " + e.getMessage());
+            }
             extInfos.add(flinkProperties);
         }
         return extInfos;
@@ -278,7 +284,12 @@ public class InlongGroupTransfer {
         if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
             flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
-            
flinkProperties.setKeyValue(JsonUtils.toJson(userDefinedSortConf.getProperties()));
+            flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+            try {
+                
flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(userDefinedSortConf.getProperties()));
+            } catch (Exception e) {
+                throw new RuntimeException("get json for sort properties 
error: " + e.getMessage());
+            }
             extInfos.add(flinkProperties);
         }
         return extInfos;
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
index 72c709baa..9691d44c1 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
@@ -17,22 +17,25 @@
 
 package org.apache.inlong.manager.common.auth;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.util.Locale;
 import java.util.Map;
 
 public interface Authentication {
 
+    ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    AuthType getAuthType();
+
+    void configure(Map<String, String> properties);
+
     enum AuthType {
         UNAME_PASSWD,
         TOKEN,
         SECRET,
         SECRET_AND_TOKEN;
 
-        @Override
-        public String toString() {
-            return this.name().toLowerCase(Locale.ROOT);
-        }
-
         public static AuthType forType(String type) {
             for (AuthType authType : values()) {
                 if (authType.name().equals(type.toUpperCase())) {
@@ -41,9 +44,10 @@ public interface Authentication {
             }
             throw new IllegalArgumentException(String.format("Unsupported 
authType=%s for Inlong", type));
         }
-    }
 
-    AuthType getAuthType();
-
-    void configure(Map<String, String> properties);
+        @Override
+        public String toString() {
+            return this.name().toLowerCase(Locale.ROOT);
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
index 9534f3b9b..b2a146dbe 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.Map;
 
@@ -60,7 +59,7 @@ public class DefaultAuthentication implements Authentication {
 
     @Override
     public String toString() {
-        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
         objectNode.put(USER_NAME, this.getUserName());
         objectNode.put(PASSWORD, this.getPassword());
         return objectNode.toString();
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
index 29ca21cde..1c37d05e3 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.Map;
 
@@ -60,7 +59,7 @@ public class SecretAuthentication implements Authentication {
 
     @Override
     public String toString() {
-        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
         objectNode.put(SECRET_ID, this.getSecretId());
         objectNode.put(SECRET_KEY, this.getSecretKey());
         return objectNode.toString();
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
index 88e50686c..5908629c1 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.Map;
 
@@ -56,7 +55,7 @@ public class SecretTokenAuthentication extends 
SecretAuthentication {
     @SneakyThrows
     @Override
     public String toString() {
-        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
         objectNode.put(SECRET_ID, this.getSecretId());
         objectNode.put(SECRET_KEY, this.getSecretKey());
         objectNode.put(SECRET_TOKEN, this.getSToken());
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
index 88defdecf..7857bf235 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.Map;
 
@@ -53,7 +52,7 @@ public class TokenAuthentication implements Authentication {
 
     @Override
     public String toString() {
-        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
         objectNode.put(TOKEN, this.getToken());
         return objectNode.toString();
     }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
deleted file mode 100644
index af36f6c34..000000000
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.util;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.exceptions.JsonException;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * JSON utils
- */
-@Slf4j
-@UtilityClass
-public class JsonUtils {
-
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-    static {
-        OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
-        OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
-        
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
-
-    /**
-     * The instance need to transform to string
-     *
-     * @param obj instance need to transform
-     * @return JSON string after transform
-     */
-    public static String toJson(Object obj) {
-        if (obj == null) {
-            return null;
-        }
-        if (obj.getClass() == String.class) {
-            return (String) obj;
-        }
-        try {
-            return OBJECT_MAPPER.writeValueAsString(obj);
-        } catch (JsonProcessingException e) {
-            log.error("JSON transform error: {}", obj, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-    /**
-     * Transform the string to specify type
-     *
-     * @param json JSON string
-     * @param type specify type
-     * @param <T> type
-     * @return instance of type T
-     */
-    public static <T> T parse(String json, TypeReference<T> type) {
-        try {
-            return OBJECT_MAPPER.readValue(json, type);
-        } catch (IOException e) {
-            log.error("JSON transform error: {}", json, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-    /**
-     * Transform the string to JsonNode
-     *
-     * @param json JSON string
-     * @return JsonNode instance after transform
-     */
-    public static JsonNode parse(String json) {
-        if (StringUtils.isBlank(json)) {
-            return null;
-        }
-        try {
-            return OBJECT_MAPPER.readTree(json);
-        } catch (IOException e) {
-            log.error("JSON transform error: {}", json, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-    /**
-     * Transform JSON string to Java instance
-     *
-     * @param json JSON string
-     * @param tClass Java instance type
-     * @param <T> type
-     * @return java instance after transform
-     */
-    public static <T> T parse(String json, Class<T> tClass) {
-        try {
-            return OBJECT_MAPPER.readValue(json, tClass);
-        } catch (IOException e) {
-            log.error("JSON transform error: {}", json, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-    public static <T> T parse(String json, JavaType javaType) {
-        try {
-            return OBJECT_MAPPER.readValue(json, javaType);
-        } catch (IOException e) {
-            log.error("JSON transform error: {}", json, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-    /**
-     * Transform JSON string to List
-     *
-     * @param json JSON string
-     * @param eClass element class
-     * @param <E> element type
-     * @return list after transform
-     */
-    public static <E> List<E> parseList(String json, Class<E> eClass) {
-        try {
-            return OBJECT_MAPPER.readValue(json,
-                    
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, eClass));
-        } catch (IOException e) {
-            log.error("JSON transform error: {}", json, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-    /**
-     * Transform JSON string to Map
-     *
-     * @param json JSON string
-     * @param kClass key type in Map
-     * @param vClass value type in Map
-     * @param <K> key type
-     * @param <V> value type
-     * @return map after transform
-     */
-    public static <K, V> Map<K, V> parseMap(String json, Class<K> kClass, 
Class<V> vClass) {
-        try {
-            return OBJECT_MAPPER.readValue(json,
-                    OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class, 
kClass, vClass));
-        } catch (IOException e) {
-            log.error("JSON transform error: {}", json, e);
-            throw new JsonException("JSON transform error");
-        }
-    }
-
-}
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index e16256823..ccdb162ef 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -25,7 +26,6 @@ import 
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -46,6 +46,8 @@ import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class DeleteSortListener implements SortOperateListener {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -76,7 +78,7 @@ public class DeleteSortListener implements 
SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+        Map<String, String> result = 
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 511fd2f0e..485d1bca4 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -26,11 +27,10 @@ import 
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +50,8 @@ import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class RestartSortListener implements SortOperateListener {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -80,7 +82,7 @@ public class RestartSortListener implements 
SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+        Map<String, String> result = 
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
@@ -97,8 +99,8 @@ public class RestartSortListener implements 
SortOperateListener {
         }
 
         // TODO Support more than one dataflow in one sort job
-        Map<String, JsonNode> dataflowMap = 
JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new 
TypeReference<Map<String, JsonNode>>() {
+        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
+                OBJECT_MAPPER.readTree(dataFlows), new 
TypeReference<Map<String, JsonNode>>() {
                 });
         Optional<JsonNode> dataflowOptional = 
dataflowMap.values().stream().findFirst();
         JsonNode dataFlow = null;
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index ede84db6f..150780967 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -26,11 +27,10 @@ import 
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +50,8 @@ import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class StartupSortListener implements SortOperateListener {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -76,7 +78,7 @@ public class StartupSortListener implements 
SortOperateListener {
                 InlongGroupExtInfo::getKeyValue));
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+            Map<String, String> result = 
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
                     new TypeReference<Map<String, String>>() {
                     });
             kvConf.putAll(result);
@@ -88,8 +90,8 @@ public class StartupSortListener implements 
SortOperateListener {
             log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, JsonNode> dataflowMap = 
JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(dataFlows), new 
TypeReference<Map<String, JsonNode>>() {
+        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
+                OBJECT_MAPPER.readTree(dataFlows), new 
TypeReference<Map<String, JsonNode>>() {
                 });
         Optional<JsonNode> dataflowOptional = 
dataflowMap.values().stream().findFirst();
         JsonNode dataFlow = null;
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index f4f9d44c6..16ebfd291 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -25,7 +26,6 @@ import 
org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -46,6 +46,8 @@ import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class SuspendSortListener implements SortOperateListener {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -76,7 +78,7 @@ public class SuspendSortListener implements 
SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = 
JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(sortExt),
+        Map<String, String> result = 
OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index 626ac9fa6..157952c85 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -26,7 +27,6 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
@@ -50,6 +50,8 @@ public class CommonOperateService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CommonOperateService.class);
 
+    @Autowired
+    public ObjectMapper objectMapper;
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
@@ -137,14 +139,22 @@ public class CommonOperateService {
         if (clusterEntity == null || 
StringUtils.isBlank(clusterEntity.getExtParams())) {
             throw new BusinessException("pulsar cluster or pulsar ext params 
is empty");
         }
-        Map<String, String> configParams = 
JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
-        PulsarClusterInfo pulsarClusterInfo = 
PulsarClusterInfo.builder().brokerServiceUrl(
-                
clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
-        String adminUrl = 
configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
-        Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third 
party cluster table");
-        pulsarClusterInfo.setAdminUrl(adminUrl);
-        pulsarClusterInfo.setType(clusterEntity.getType());
-        return pulsarClusterInfo;
+
+        PulsarClusterInfo pulsarCluster = PulsarClusterInfo.builder()
+                .brokerServiceUrl(clusterEntity.getUrl())
+                .token(clusterEntity.getToken())
+                .build();
+        try {
+            Map<String, String> configParams = 
objectMapper.readValue(clusterEntity.getExtParams(), Map.class);
+            String adminUrl = 
configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
+            pulsarCluster.setAdminUrl(adminUrl);
+        } catch (Exception e) {
+            LOGGER.error("parse pulsar cluster info error: ", e);
+        }
+
+        Preconditions.checkNotNull(pulsarCluster.getAdminUrl(), "adminUrl is 
empty, check third party cluster table");
+        pulsarCluster.setType(clusterEntity.getType());
+        return pulsarCluster;
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
index 597afdc25..82a20ef72 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operationlog/OperationLogRecorder.java
@@ -17,14 +17,12 @@
 
 package org.apache.inlong.manager.service.core.operationlog;
 
-import java.util.Date;
-import java.util.Optional;
-import javax.servlet.http.HttpServletRequest;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.pojo.user.UserDetail;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.LoginUserUtils;
 import org.apache.inlong.manager.common.util.NetworkUtils;
 import org.apache.inlong.manager.dao.entity.OperationLogEntity;
@@ -33,6 +31,10 @@ import 
org.springframework.web.context.request.RequestAttributes;
 import org.springframework.web.context.request.RequestContextHolder;
 import org.springframework.web.context.request.ServletRequestAttributes;
 
+import javax.servlet.http.HttpServletRequest;
+import java.util.Date;
+import java.util.Optional;
+
 /**
  * Operation of log aspect
  */
@@ -40,6 +42,7 @@ import 
org.springframework.web.context.request.ServletRequestAttributes;
 public class OperationLogRecorder {
 
     private static final String ANONYMOUS_USER = "AnonymousUser";
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
 
     /**
      * Save operation logs of all Controller
@@ -60,8 +63,8 @@ public class OperationLogRecorder {
         String requestUrl = request.getRequestURI();
         String httpMethod = request.getMethod();
         String remoteAddress = NetworkUtils.getClientIpAddress(request);
-        String param = JsonUtils.toJson(request.getParameterMap());
-        String body = JsonUtils.toJson(joinPoint.getArgs());
+        String param = GSON.toJson(request.getParameterMap());
+        String body = GSON.toJson(joinPoint.getArgs());
 
         OperationType operationType = operationLog.operation();
 
@@ -92,9 +95,9 @@ public class OperationLogRecorder {
             if (operationLog.db()) {
                 OperationLogPool.publish(operationLogEntity);
             } else if (success) {
-                log.info("operation log: {}", 
JsonUtils.toJson(operationLogEntity));
+                log.info("operation log: {}", GSON.toJson(operationLogEntity));
             } else {
-                log.error("request handle failed : {}", 
JsonUtils.toJson(operationLogEntity));
+                log.error("request handle failed : {}", 
GSON.toJson(operationLogEntity));
             }
         }
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
index 626cb4d4a..492c44f2f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
@@ -17,13 +17,13 @@
 
 package org.apache.inlong.manager.service.sort;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.DataFlowUtils;
@@ -57,6 +57,8 @@ public class PushSortConfigListener implements 
SortOperateListener {
     private StreamSinkService streamSinkService;
     @Autowired
     private DataFlowUtils dataFlowUtils;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     @Override
     public TaskEvent event() {
@@ -86,13 +88,9 @@ public class PushSortConfigListener implements 
SortOperateListener {
                 LOGGER.debug("sink info: {}", sinkResponse);
             }
 
-            DataFlowInfo dataFlowInfo = 
dataFlowUtils.createDataFlow(groupInfo, sinkResponse);
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("try to push config to sort: {}", 
JsonUtils.toJson(dataFlowInfo));
-            }
-
             Integer sinkId = sinkResponse.getId();
             try {
+                DataFlowInfo dataFlowInfo = 
dataFlowUtils.createDataFlow(groupInfo, sinkResponse);
                 String zkUrl = clusterBean.getZkUrl();
                 String zkRoot = clusterBean.getZkRoot();
                 // push data flow info to zk
@@ -100,6 +98,10 @@ public class PushSortConfigListener implements 
SortOperateListener {
                 ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, 
sinkId, zkUrl, zkRoot);
                 // add sink id to zk
                 ZkTools.addDataFlowToCluster(sortClusterName, sinkId, zkUrl, 
zkRoot);
+
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("success to push config to sort: {}", 
objectMapper.writeValueAsString(dataFlowInfo));
+                }
             } catch (Exception e) {
                 LOGGER.error("push sort config to zookeeper failed, sinkId={} 
", sinkId, e);
                 throw new WorkflowListenerException("push sort config to 
zookeeper failed: " + e.getMessage());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
index 7f963f1aa..2933e0113 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinitionRegister.java
@@ -18,32 +18,32 @@
 package org.apache.inlong.manager.service.workflow;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
+import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
 
-@Service
 @Slf4j
+@Service
 public class WorkflowDefinitionRegister {
 
     @Autowired
-    private WorkflowEngine workflowEngine;
+    private ProcessDefinitionService processDefService;
     @Autowired
     private List<WorkflowDefinition> workflowDefinitions;
 
     @PostConstruct
-    public void registerDefinition() {
-        workflowDefinitions.forEach(definition -> {
+    public void registerWorkflowDefinition() {
+        for (WorkflowDefinition definition : workflowDefinitions) {
             try {
-                
workflowEngine.processDefinitionService().register(definition.defineProcess());
+                processDefService.register(definition.defineProcess());
                 log.info("success register workflow definition: {}", 
definition.getProcessName());
             } catch (Exception e) {
                 log.error("failed to register workflow definition {}", 
definition.getProcessName(), e);
             }
-        });
+        }
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowEngineConfig.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowEngineConfig.java
deleted file mode 100644
index bc122e500..000000000
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowEngineConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
-import org.apache.inlong.manager.workflow.WorkflowConfig;
-import org.apache.inlong.manager.workflow.core.EventListenerService;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
-import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
-import 
org.apache.inlong.manager.workflow.core.impl.MemoryProcessDefinitionRepository;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEngineImpl;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
-
-/**
- * Workflow engine config
- */
-@Component
-@Configuration
-@Slf4j
-public class WorkflowEngineConfig {
-
-    @Autowired
-    private WorkflowQueryService queryService;
-    @Autowired
-    private MemoryProcessDefinitionRepository memoryProcessRepository;
-    @Autowired
-    private WorkflowProcessEntityMapper processEntityMapper;
-    @Autowired
-    private WorkflowTaskEntityMapper taskEntityMapper;
-    @Autowired
-    private WorkflowEventLogEntityMapper eventLogMapper;
-    @Autowired
-    private PlatformTransactionManager platformTransactionManager;
-
-    @Bean
-    public WorkflowEngine workflowEngine() {
-        WorkflowConfig workflowConfig = new WorkflowConfig()
-                .setQueryService(queryService)
-                .setProcessEntityMapper(processEntityMapper)
-                .setTaskEntityMapper(taskEntityMapper)
-                .setEventLogMapper(eventLogMapper)
-                .setDefinitionRepository(memoryProcessRepository)
-                .setTransactionManager(platformTransactionManager);
-
-        return new WorkflowEngineImpl(workflowConfig);
-    }
-
-    @Bean
-    public EventListenerService eventListenerService(WorkflowEngine 
workflowEngine) {
-        return workflowEngine.eventListenerService();
-    }
-
-    @Bean
-    public ProcessDefinitionService processDefinitionService(WorkflowEngine 
workflowEngine) {
-        return workflowEngine.processDefinitionService();
-    }
-
-}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index b10cb13c2..9c1cdc180 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.workflow;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
@@ -43,7 +44,9 @@ import 
org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
 import 
org.apache.inlong.manager.service.workflow.WorkflowExecuteLog.ListenerExecutorLog;
 import 
org.apache.inlong.manager.service.workflow.WorkflowExecuteLog.TaskExecutorLog;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
+import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
+import org.apache.inlong.manager.workflow.core.ProcessService;
+import org.apache.inlong.manager.workflow.core.TaskService;
 import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
 import org.apache.inlong.manager.workflow.definition.UserTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -70,51 +73,56 @@ public class WorkflowServiceImpl implements WorkflowService 
{
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkflowServiceImpl.class);
 
-    @Autowired
-    private WorkflowEngine workflowEngine;
-
     @Autowired
     private WorkflowQueryService queryService;
+    @Autowired
+    private ProcessDefinitionService processDefService;
+    @Autowired
+    private ProcessService processService;
+    @Autowired
+    private TaskService taskService;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     @Override
     @Transactional(noRollbackFor = WorkflowNoRollbackException.class, 
rollbackFor = Exception.class)
     public WorkflowResult start(ProcessName process, String applicant, 
ProcessForm form) {
-        WorkflowContext context = 
workflowEngine.processService().start(process.name(), applicant, form);
+        WorkflowContext context = processService.start(process.name(), 
applicant, form);
         return WorkflowBeanUtils.result(context);
     }
 
     @Override
     @Transactional(noRollbackFor = WorkflowNoRollbackException.class, 
rollbackFor = Exception.class)
     public WorkflowResult cancel(Integer processId, String operator, String 
remark) {
-        WorkflowContext context = 
workflowEngine.processService().cancel(processId, operator, remark);
+        WorkflowContext context = processService.cancel(processId, operator, 
remark);
         return WorkflowBeanUtils.result(context);
     }
 
     @Override
     @Transactional(noRollbackFor = WorkflowNoRollbackException.class, 
rollbackFor = Exception.class)
     public WorkflowResult approve(Integer taskId, String remark, TaskForm 
form, String operator) {
-        WorkflowContext context = workflowEngine.taskService().approve(taskId, 
remark, form, operator);
+        WorkflowContext context = taskService.approve(taskId, remark, form, 
operator);
         return WorkflowBeanUtils.result(context);
     }
 
     @Override
     @Transactional(noRollbackFor = WorkflowNoRollbackException.class, 
rollbackFor = Exception.class)
     public WorkflowResult reject(Integer taskId, String remark, String 
operator) {
-        WorkflowContext context = workflowEngine.taskService().reject(taskId, 
remark, operator);
+        WorkflowContext context = taskService.reject(taskId, remark, operator);
         return WorkflowBeanUtils.result(context);
     }
 
     @Override
     @Transactional(noRollbackFor = WorkflowNoRollbackException.class, 
rollbackFor = Exception.class)
     public WorkflowResult transfer(Integer taskId, String remark, List<String> 
to, String operator) {
-        WorkflowContext context = 
workflowEngine.taskService().transfer(taskId, remark, to, operator);
+        WorkflowContext context = taskService.transfer(taskId, remark, to, 
operator);
         return WorkflowBeanUtils.result(context);
     }
 
     @Override
     @Transactional(noRollbackFor = WorkflowNoRollbackException.class, 
rollbackFor = Exception.class)
     public WorkflowResult complete(Integer taskId, String remark, String 
operator) {
-        WorkflowContext context = 
workflowEngine.taskService().complete(taskId, remark, operator);
+        WorkflowContext context = taskService.complete(taskId, remark, 
operator);
         return WorkflowBeanUtils.result(context);
     }
 
@@ -252,13 +260,14 @@ public class WorkflowServiceImpl implements 
WorkflowService {
     }
 
     private Map<String, Object> getShowInList(WorkflowProcessEntity 
processEntity) {
-        WorkflowProcess process = 
workflowEngine.processDefinitionService().getByName(processEntity.getName());
+        WorkflowProcess process = 
processDefService.getByName(processEntity.getName());
         if (process == null || process.getFormClass() == null) {
             return null;
         }
 
         try {
-            ProcessForm processForm = 
WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(), process);
+            ProcessForm processForm = 
WorkflowFormParserUtils.parseProcessForm(objectMapper,
+                    processEntity.getFormData(), process);
             assert processForm != null;
             return processForm.showInList();
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
index 1996b0589..d9fe25a12 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.workflow.consumption;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessDetailResponse;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
@@ -32,21 +33,22 @@ import org.springframework.stereotype.Component;
 @Component
 public class NewConsumptionProcessDetailHandler implements 
ProcessDetailHandler {
 
+    @Autowired
+    private ObjectMapper objectMapper;
     @Autowired
     private ProcessDefinitionService processDefinitionService;
 
     @Override
-    public ProcessDetailResponse handle(ProcessDetailResponse 
workflowResponse) {
-        WorkflowProcess process = 
processDefinitionService.getByName(workflowResponse.getWorkflow().getName());
-
+    public ProcessDetailResponse handle(ProcessDetailResponse processResponse) 
{
+        WorkflowProcess process = 
processDefinitionService.getByName(processResponse.getWorkflow().getName());
         NewConsumptionProcessForm processForm = WorkflowFormParserUtils
-                
.parseProcessForm(workflowResponse.getProcessInfo().getFormData().toString(), 
process);
+                .parseProcessForm(objectMapper, 
processResponse.getProcessInfo().getFormData().toString(), process);
         if (processForm == null) {
-            return workflowResponse;
+            return processResponse;
         }
 
-        workflowResponse.getProcessInfo().setFormData(processForm);
-        return workflowResponse;
+        processResponse.getProcessInfo().setFormData(processForm);
+        return processResponse;
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index cd1a27a52..115267879 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -130,7 +130,7 @@ public class DisableZkForSortTest extends 
WorkflowServiceImplTest {
         createHiveSink(streamInfo);
         createKafkaSource(streamInfo);
         mockTaskListenerFactory();
-        WorkflowContext context = 
workflowEngine.processService().start(processName.name(), applicant, form);
+        WorkflowContext context = processService.start(processName.name(), 
applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -161,8 +161,7 @@ public class DisableZkForSortTest extends 
WorkflowServiceImplTest {
         form.setGroupOperateType(GroupOperateType.SUSPEND);
         taskListenerFactory.acceptPlugin(new MockPlugin());
 
-        WorkflowContext context = workflowEngine.processService()
-                .start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, 
form);
+        WorkflowContext context = 
processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 684d4b5ff..8529d88ac 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -64,7 +64,7 @@ public class DataSourceListenerTest extends 
WorkflowServiceImplTest {
         sourceRequest.setSourceName("binlog-collect");
         return streamSourceService.save(sourceRequest, OPERATOR);
     }
-    
+
     /**
      * There will be concurrency problems in the overall operation,This method 
temporarily fails the test
      */
@@ -81,8 +81,7 @@ public class DataSourceListenerTest extends 
WorkflowServiceImplTest {
         form = new GroupResourceProcessForm();
         form.setGroupInfo(groupInfo);
         form.setGroupOperateType(GroupOperateType.SUSPEND);
-        WorkflowContext context = workflowEngine.processService()
-                .start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, 
form);
+        WorkflowContext context = 
processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -112,8 +111,7 @@ public class DataSourceListenerTest extends 
WorkflowServiceImplTest {
         form = new GroupResourceProcessForm();
         form.setGroupInfo(groupInfo);
         form.setGroupOperateType(GroupOperateType.RESTART);
-        WorkflowContext context = workflowEngine.processService()
-                .start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant, 
form);
+        WorkflowContext context = 
processService.start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 21b946359..6bda5b50e 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -50,7 +50,7 @@ import 
org.apache.inlong.manager.service.resource.SinkResourceListener;
 import org.apache.inlong.manager.service.sort.PushSortConfigListener;
 import 
org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
+import org.apache.inlong.manager.workflow.core.ProcessService;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -95,7 +95,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
     @Autowired
     protected WorkflowServiceImpl workflowService;
     @Autowired
-    protected WorkflowEngine workflowEngine;
+    protected ProcessService processService;
     @Autowired
     protected InlongGroupService groupService;
     @Autowired
@@ -251,7 +251,7 @@ public class WorkflowServiceImplTest extends 
ServiceBaseTest {
     public void testStartCreatePulsarWorkflow() {
         initGroupForm(MQType.PULSAR.getType(), "test14" + subType);
         mockTaskListenerFactory();
-        WorkflowContext context = 
workflowEngine.processService().start(processName.name(), applicant, form);
+        WorkflowContext context = processService.start(processName.name(), 
applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse view = result.getProcessInfo();
         // This method temporarily fails the test, so comment it out first
@@ -271,7 +271,7 @@ public class WorkflowServiceImplTest extends 
ServiceBaseTest {
     public void testStartCreateTubeWorkflow() {
         initGroupForm(MQType.TUBE.getType(), "test10" + subType);
         mockTaskListenerFactory();
-        WorkflowContext context = 
workflowEngine.processService().start(processName.name(), applicant, form);
+        WorkflowContext context = processService.start(processName.name(), 
applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assert.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -297,7 +297,7 @@ public class WorkflowServiceImplTest extends 
ServiceBaseTest {
         form.setGroupOperateType(GroupOperateType.SUSPEND);
         taskListenerFactory.acceptPlugin(new MockPlugin());
 
-        WorkflowContext context = workflowEngine.processService()
+        WorkflowContext context = processService
                 .start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, 
form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
@@ -332,7 +332,7 @@ public class WorkflowServiceImplTest extends 
ServiceBaseTest {
         form.setGroupOperateType(GroupOperateType.RESTART);
         taskListenerFactory.acceptPlugin(new MockPlugin());
 
-        WorkflowContext context = workflowEngine.processService()
+        WorkflowContext context = processService
                 .start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant, 
form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
@@ -368,7 +368,7 @@ public class WorkflowServiceImplTest extends 
ServiceBaseTest {
         form.setGroupOperateType(GroupOperateType.DELETE);
         taskListenerFactory.acceptPlugin(new MockPlugin());
 
-        WorkflowContext context = workflowEngine.processService()
+        WorkflowContext context = processService
                 .start(ProcessName.DELETE_GROUP_PROCESS.name(), applicant, 
form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse view = result.getProcessInfo();
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 12938d63d..704d2bc0b 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -20,6 +20,8 @@ package org.apache.inlong.manager.workflow.core;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
 import 
org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 import org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.transaction.TransactionException;
 import org.springframework.transaction.TransactionStatus;
@@ -34,13 +36,11 @@ import java.lang.reflect.UndeclaredThrowableException;
  * Transaction Helper
  */
 @Slf4j
+@Service
 public class TransactionHelper {
 
-    private final PlatformTransactionManager transactionManager;
-
-    public TransactionHelper(PlatformTransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
-    }
+    @Autowired
+    private PlatformTransactionManager transactionManager;
 
     /**
      * Execute in transaction
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/WorkflowEngine.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/WorkflowEngine.java
deleted file mode 100644
index 12aed2a97..000000000
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/WorkflowEngine.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.core;
-
-/**
- * Workflow engine
- */
-public interface WorkflowEngine {
-
-    /**
-     * Get process definition service
-     *
-     * @return WorkflowProcess definition service
-     */
-    ProcessDefinitionService processDefinitionService();
-
-    /**
-     * Get process instance service
-     *
-     * @return Process instance service
-     */
-    ProcessService processService();
-
-    /**
-     * Obtain task processing services
-     *
-     * @return task services
-     */
-    TaskService taskService();
-
-    /**
-     * Get event listener service
-     *
-     * @return event listener service
-     */
-    EventListenerService eventListenerService();
-
-}
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
index 27d33da9f..9762bc40c 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/EventListenerServiceImpl.java
@@ -28,29 +28,35 @@ import 
org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
 import org.apache.inlong.manager.workflow.definition.Element;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
-import org.apache.inlong.manager.workflow.event.EventListenerManagerFactory;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
+import 
org.apache.inlong.manager.workflow.event.process.ProcessEventListenerManager;
+import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEventListenerManager;
+import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * Event listener service
  */
+@Service
 public class EventListenerServiceImpl implements EventListenerService {
 
-    private final WorkflowContextBuilder workflowContextBuilder;
-    private final WorkflowEventNotifier workflowEventNotifier;
-    private final EventListenerManagerFactory listenerManagerFactory;
-    private final WorkflowEventLogEntityMapper eventLogMapper;
-
-    public EventListenerServiceImpl(WorkflowContextBuilder contextBuilder, 
WorkflowEventNotifier eventNotifier,
-            EventListenerManagerFactory listenerManagerFactory, 
WorkflowEventLogEntityMapper eventLogMapper) {
-        this.workflowContextBuilder = contextBuilder;
-        this.workflowEventNotifier = eventNotifier;
-        this.listenerManagerFactory = listenerManagerFactory;
-        this.eventLogMapper = eventLogMapper;
-    }
+    @Autowired
+    private WorkflowContextBuilder workflowContextBuilder;
+    @Autowired
+    private ProcessEventNotifier processEventNotifier;
+    @Autowired
+    private TaskEventNotifier taskEventNotifier;
+    @Autowired
+    private ProcessEventListenerManager processListenerManager;
+    @Autowired
+    private TaskEventListenerManager taskListenerManager;
+    @Autowired
+    private WorkflowEventLogEntityMapper eventLogMapper;
 
     @Override
     public void executeEventListener(Integer eventLogId) {
@@ -75,7 +81,7 @@ public class EventListenerServiceImpl implements 
EventListenerService {
         ProcessEvent processEvent = 
getProcessEventListener(context.getProcess(), listener).event();
         context.setCurrentElement(getCurrentElement(context.getProcess(), 
processEvent));
 
-        workflowEventNotifier.getProcessEventNotifier().notify(listener, true, 
context);
+        processEventNotifier.notify(listener, true, context);
     }
 
     @Override
@@ -83,21 +89,21 @@ public class EventListenerServiceImpl implements 
EventListenerService {
         WorkflowContext context = 
workflowContextBuilder.buildContextForTask(taskId, null);
         TaskEventListener eventListener = getTaskEventListener((WorkflowTask) 
context.getCurrentElement(), listener);
         
context.getActionContext().setAction(WorkflowAction.fromTaskEvent(eventListener.event()));
-        workflowEventNotifier.getTaskEventNotifier().notify(listener, true, 
context);
+        taskEventNotifier.notify(listener, true, context);
     }
 
     @Override
     public void triggerProcessEvent(Integer processId, ProcessEvent 
processEvent) {
         WorkflowContext context = 
workflowContextBuilder.buildContextForProcess(processId);
         context.setCurrentElement(getCurrentElement(context.getProcess(), 
processEvent));
-        workflowEventNotifier.getProcessEventNotifier().notify(processEvent, 
context);
+        processEventNotifier.notify(processEvent, context);
     }
 
     @Override
     public void triggerTaskEvent(Integer taskId, TaskEvent taskEvent) {
         WorkflowContext context = workflowContextBuilder
                 .buildContextForTask(taskId, 
WorkflowAction.fromTaskEvent(taskEvent));
-        workflowEventNotifier.getTaskEventNotifier().notify(taskEvent, 
context);
+        taskEventNotifier.notify(taskEvent, context);
     }
 
     private Element getCurrentElement(WorkflowProcess process, ProcessEvent 
processEvent) {
@@ -113,7 +119,7 @@ public class EventListenerServiceImpl implements 
EventListenerService {
             return listener;
         }
 
-        listener = 
listenerManagerFactory.getProcessListenerManager().listener(listenerName);
+        listener = processListenerManager.listener(listenerName);
         Preconditions.checkNotNull(listener, "process listener not exist with 
name: " + listenerName);
         return listener;
     }
@@ -124,7 +130,7 @@ public class EventListenerServiceImpl implements 
EventListenerService {
             return listener;
         }
 
-        listener = 
listenerManagerFactory.getTaskListenerManager().listener(listenerName);
+        listener = taskListenerManager.listener(listenerName);
         Preconditions.checkNotNull(listener, "task listener not exist with 
name: " + listenerName);
         return listener;
     }
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
index 632d5910c..38bb8a68a 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessDefinitionServiceImpl.java
@@ -21,17 +21,17 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * WorkflowProcess definition service
  */
+@Service
 public class ProcessDefinitionServiceImpl implements ProcessDefinitionService {
 
-    private final ProcessDefinitionRepository definitionRepository;
-
-    public ProcessDefinitionServiceImpl(ProcessDefinitionRepository 
definitionRepository) {
-        this.definitionRepository = definitionRepository;
-    }
+    @Autowired
+    private ProcessDefinitionRepository definitionRepository;
 
     @Override
     public void register(WorkflowProcess process) {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
index 303ab3151..1c67caf1e 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.workflow.core.impl;
 
 import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
@@ -26,28 +27,24 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.ProcessService;
 import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
 import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.List;
 
 /**
  * WorkflowProcess service
  */
+@Service
 public class ProcessServiceImpl implements ProcessService {
 
-    private final ProcessorExecutor processorExecutor;
-    private final WorkflowContextBuilder workflowContextBuilder;
-    private final WorkflowTaskEntityMapper taskEntityMapper;
-
-    public ProcessServiceImpl(
-            ProcessorExecutor processorExecutor,
-            WorkflowContextBuilder workflowContextBuilder,
-            WorkflowTaskEntityMapper taskEntityMapper) {
-        this.processorExecutor = processorExecutor;
-        this.workflowContextBuilder = workflowContextBuilder;
-        this.taskEntityMapper = taskEntityMapper;
-    }
+    @Autowired
+    private ProcessorExecutor processorExecutor;
+    @Autowired
+    private WorkflowContextBuilder workflowContextBuilder;
+    @Autowired
+    private WorkflowTaskEntityMapper taskEntityMapper;
 
     @Override
     public WorkflowContext start(String name, String applicant, ProcessForm 
form) {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
index 694044946..bf9c055f3 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java
@@ -23,8 +23,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
 import org.apache.inlong.manager.common.exceptions.WorkflowNoRollbackException;
 import 
org.apache.inlong.manager.common.exceptions.WorkflowRollbackOnceException;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
 import org.apache.inlong.manager.workflow.core.TransactionHelper;
@@ -38,44 +36,46 @@ import 
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor;
 import org.apache.inlong.manager.workflow.processor.SkipableElementProcessor;
 import org.apache.inlong.manager.workflow.processor.StartEventProcessor;
 import org.apache.inlong.manager.workflow.processor.UserTaskProcessor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.support.TransactionCallback;
 
+import javax.annotation.PostConstruct;
 import java.util.List;
 
 /**
  * Workload component processor execution
  */
 @Slf4j
+@Service
 public class ProcessorExecutorImpl implements ProcessorExecutor {
 
-    private final ImmutableMap<Class<? extends Element>, ElementProcessor<? 
extends Element>> elementProcessor;
-
-    private final TransactionHelper transactionHelper;
-
-    public ProcessorExecutorImpl(
-            WorkflowProcessEntityMapper processEntityMapper,
-            WorkflowTaskEntityMapper taskEntityMapper,
-            WorkflowEventNotifier workflowEventNotifier,
-            TransactionHelper transactionHelper) {
+    private ImmutableMap<Class<? extends Element>, ElementProcessor<? extends 
Element>> elementProcessor;
+
+    @Autowired
+    private TransactionHelper transactionHelper;
+    @Autowired
+    private StartEventProcessor startEventProcessor;
+    @Autowired
+    private EndEventProcessor endEventProcessor;
+    @Autowired
+    private UserTaskProcessor userTaskProcessor;
+    @Autowired
+    private ServiceTaskProcessor serviceTaskProcessor;
+
+    @PostConstruct
+    private void initProcessors() {
+        List<ElementProcessor<?>> processors = Lists.newArrayList();
+        processors.add(startEventProcessor);
+        processors.add(endEventProcessor);
+        processors.add(userTaskProcessor);
+        processors.add(serviceTaskProcessor);
 
-        List<ElementProcessor<? extends Element>> processors = 
initProcessors(processEntityMapper,
-                taskEntityMapper, workflowEventNotifier);
         ImmutableMap.Builder<Class<? extends Element>, ElementProcessor<? 
extends Element>> builder
                 = ImmutableMap.builder();
         processors.forEach(processor -> builder.put(processor.watch(), 
processor));
         elementProcessor = builder.build();
-        this.transactionHelper = transactionHelper;
-    }
-
-    private List<ElementProcessor<?>> 
initProcessors(WorkflowProcessEntityMapper processEntityMapper,
-            WorkflowTaskEntityMapper taskEntityMapper, WorkflowEventNotifier 
eventNotifier) {
-        List<ElementProcessor<?>> processors = Lists.newArrayList();
-        processors.add(new StartEventProcessor(processEntityMapper, 
eventNotifier));
-        processors.add(new EndEventProcessor(processEntityMapper, 
taskEntityMapper, eventNotifier));
-        processors.add(new UserTaskProcessor(taskEntityMapper, eventNotifier));
-        processors.add(new ServiceTaskProcessor(taskEntityMapper, 
eventNotifier));
-        return processors;
     }
 
     private ElementProcessor<? extends Element> getProcessor(Class<? extends 
Element> elementClazz) {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
index 9f2c8c116..397c2a9ae 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/TaskServiceImpl.java
@@ -17,28 +17,27 @@
 
 package org.apache.inlong.manager.workflow.core.impl;
 
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
 import org.apache.inlong.manager.workflow.core.TaskService;
 import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.List;
 
 /**
  * WorkflowTask service
  */
+@Service
 public class TaskServiceImpl implements TaskService {
 
-    private final ProcessorExecutor processorExecutor;
-    private final WorkflowContextBuilder contextBuilder;
-
-    public TaskServiceImpl(ProcessorExecutor processorExecutor,
-            WorkflowContextBuilder contextBuilder) {
-        this.processorExecutor = processorExecutor;
-        this.contextBuilder = contextBuilder;
-    }
+    @Autowired
+    private ProcessorExecutor processorExecutor;
+    @Autowired
+    private WorkflowContextBuilder contextBuilder;
 
     @Override
     public WorkflowContext approve(Integer taskId, String remark, TaskForm 
form, String operator) {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
index 4f5caad25..e62b9ec98 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowContextBuilderImpl.java
@@ -17,10 +17,15 @@
 
 package org.apache.inlong.manager.workflow.core.impl;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
@@ -30,11 +35,11 @@ import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
 import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
 import org.apache.inlong.manager.workflow.util.WorkflowFormParserUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.List;
 import java.util.Map;
@@ -42,19 +47,18 @@ import java.util.Map;
 /**
  * Workflow context builder
  */
+@Slf4j
+@Service
 public class WorkflowContextBuilderImpl implements WorkflowContextBuilder {
 
-    private final ProcessDefinitionRepository definitionRepository;
-    private final WorkflowProcessEntityMapper processEntityMapper;
-    private final WorkflowTaskEntityMapper taskEntityMapper;
-
-    public WorkflowContextBuilderImpl(ProcessDefinitionRepository 
definitionRepository,
-            WorkflowProcessEntityMapper processEntityMapper,
-            WorkflowTaskEntityMapper taskEntityMapper) {
-        this.definitionRepository = definitionRepository;
-        this.processEntityMapper = processEntityMapper;
-        this.taskEntityMapper = taskEntityMapper;
-    }
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private ProcessDefinitionRepository definitionRepository;
+    @Autowired
+    private WorkflowProcessEntityMapper processEntityMapper;
+    @Autowired
+    private WorkflowTaskEntityMapper taskEntityMapper;
 
     @SneakyThrows
     @Override
@@ -78,7 +82,8 @@ public class WorkflowContextBuilderImpl implements 
WorkflowContextBuilder {
         return new WorkflowContext()
                 .setOperator(processEntity.getApplicant())
                 .setProcess(process)
-                
.setProcessForm(WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(),
 process))
+                .setProcessForm(
+                        WorkflowFormParserUtils.parseProcessForm(objectMapper, 
processEntity.getFormData(), process))
                 .setProcessEntity(processEntity);
     }
 
@@ -104,7 +109,7 @@ public class WorkflowContextBuilderImpl implements 
WorkflowContextBuilder {
     public WorkflowContext buildContextForTask(Integer taskId, WorkflowAction 
action) {
         WorkflowTaskEntity taskEntity = taskEntityMapper.selectById(taskId);
         WorkflowProcess process = 
definitionRepository.get(taskEntity.getProcessName()).clone();
-        TaskForm taskForm = WorkflowFormParserUtils.parseTaskForm(taskEntity, 
process);
+        TaskForm taskForm = 
WorkflowFormParserUtils.parseTaskForm(objectMapper, taskEntity, process);
         List<String> transferToUsers = 
getTransferToUsers(taskEntity.getExtParams());
         return buildContextForTask(taskId, action, taskForm, transferToUsers, 
taskEntity.getRemark(),
                 taskEntity.getOperator());
@@ -118,7 +123,8 @@ public class WorkflowContextBuilderImpl implements 
WorkflowContextBuilder {
 
         WorkflowProcessEntity processEntity = 
processEntityMapper.selectById(taskEntity.getProcessId());
         WorkflowProcess process = 
definitionRepository.get(processEntity.getName()).clone();
-        ProcessForm processForm = 
WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(), process);
+        ProcessForm processForm = 
WorkflowFormParserUtils.parseProcessForm(objectMapper, 
processEntity.getFormData(),
+                process);
         WorkflowTask task = process.getTaskByName(taskEntity.getName());
 
         return new WorkflowContext().setProcess(process)
@@ -126,29 +132,34 @@ public class WorkflowContextBuilderImpl implements 
WorkflowContextBuilder {
                 .setProcessForm(processForm)
                 .setProcessEntity(processEntity)
                 .setCurrentElement(task)
-                .setActionContext(
-                        new WorkflowContext.ActionContext()
-                                .setAction(action)
-                                .setTaskEntity(taskEntity)
-                                .setTask(task)
-                                .setForm(taskForm)
-                                .setTransferToUsers(transferToUsers)
-                                .setOperator(operator)
-                                .setRemark(remark)
+                .setActionContext(new WorkflowContext.ActionContext()
+                        .setAction(action)
+                        .setTaskEntity(taskEntity)
+                        .setTask(task)
+                        .setForm(taskForm)
+                        .setTransferToUsers(transferToUsers)
+                        .setOperator(operator)
+                        .setRemark(remark)
                 );
     }
 
+    @SuppressWarnings("unchecked")
     private List<String> getTransferToUsers(String ext) {
         if (StringUtils.isEmpty(ext)) {
             return Lists.newArrayList();
         }
-        Map<String, Object> extMap = JsonUtils.parseMap(ext, String.class, 
Object.class);
-        if (!extMap.containsKey(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY)) {
-            return Lists.newArrayList();
-        }
-
-        if (extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY) instanceof 
List) {
-            return (List<String>) 
extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY);
+        try {
+            Map<String, Object> extMap = objectMapper.readValue(ext,
+                    objectMapper.getTypeFactory().constructMapType(Map.class, 
String.class, Object.class));
+            if (!extMap.containsKey(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY)) 
{
+                return Lists.newArrayList();
+            }
+            if (extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY) 
instanceof List) {
+                return (List<String>) 
extMap.get(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY);
+            }
+        } catch (JsonProcessingException e) {
+            log.error("parse transfer users error: ", e);
+            throw new JsonException("parse transfer users error");
         }
 
         return null;
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEngineImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEngineImpl.java
deleted file mode 100644
index e4f6f3869..000000000
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEngineImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.core.impl;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
-import org.apache.inlong.manager.workflow.WorkflowConfig;
-import org.apache.inlong.manager.workflow.core.EventListenerService;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
-import org.apache.inlong.manager.workflow.core.ProcessService;
-import org.apache.inlong.manager.workflow.core.ProcessorExecutor;
-import org.apache.inlong.manager.workflow.core.TaskService;
-import org.apache.inlong.manager.workflow.core.TransactionHelper;
-import org.apache.inlong.manager.workflow.core.WorkflowContextBuilder;
-import org.apache.inlong.manager.workflow.core.WorkflowEngine;
-import org.apache.inlong.manager.workflow.event.EventListenerManagerFactory;
-
-/**
- * Workflow engine
- */
-@Slf4j
-public class WorkflowEngineImpl implements WorkflowEngine {
-
-    private final ProcessDefinitionService processDefService;
-
-    private final ProcessService processService;
-
-    private final TaskService taskService;
-
-    private final EventListenerService eventListenerService;
-
-    /**
-     * Construct WorkflowConfig instance
-     */
-    public WorkflowEngineImpl(WorkflowConfig workflowConfig) {
-        log.info("start init workflow engine with config: {}", 
JsonUtils.toJson(workflowConfig));
-
-        // Database transaction assistant
-        TransactionHelper transactionHelper = new 
TransactionHelper(workflowConfig.getTransactionManager());
-
-        // Workflow event listener manager
-        EventListenerManagerFactory listenerManagerFactory = new 
EventListenerManagerFactory(workflowConfig);
-
-        // Workflow event notifier
-        WorkflowEventNotifier workflowEventNotifier = new 
WorkflowEventNotifier(listenerManagerFactory);
-
-        // Workflow component executor
-        WorkflowProcessEntityMapper processEntityMapper = 
workflowConfig.getProcessEntityMapper();
-        WorkflowTaskEntityMapper taskEntityMapper = 
workflowConfig.getTaskEntityMapper();
-        ProcessorExecutor processorExecutor = new 
ProcessorExecutorImpl(processEntityMapper,
-                taskEntityMapper, workflowEventNotifier, transactionHelper);
-
-        ProcessDefinitionRepository processDefRepository = 
workflowConfig.getDefinitionRepository();
-
-        // Workflow context builder
-        WorkflowContextBuilder contextBuilder = new WorkflowContextBuilderImpl(
-                processDefRepository, processEntityMapper, taskEntityMapper);
-
-        // Workflow process definition service
-        this.processDefService = new 
ProcessDefinitionServiceImpl(processDefRepository);
-
-        // Workflow process instance service
-        this.processService = new ProcessServiceImpl(processorExecutor, 
contextBuilder, taskEntityMapper);
-
-        // Workflow task service
-        this.taskService = new TaskServiceImpl(processorExecutor, 
contextBuilder);
-
-        // Workflow event listener service
-        WorkflowEventLogEntityMapper eventLogMapper = 
workflowConfig.getEventLogMapper();
-        this.eventListenerService = new 
EventListenerServiceImpl(contextBuilder, workflowEventNotifier,
-                listenerManagerFactory, eventLogMapper);
-
-        log.info("success init workflow engine");
-    }
-
-    @Override
-    public ProcessDefinitionService processDefinitionService() {
-        return processDefService;
-    }
-
-    @Override
-    public ProcessService processService() {
-        return processService;
-    }
-
-    @Override
-    public TaskService taskService() {
-        return taskService;
-    }
-
-    @Override
-    public EventListenerService eventListenerService() {
-        return eventListenerService;
-    }
-
-}
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEventNotifier.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEventNotifier.java
deleted file mode 100644
index 9fd52f0de..000000000
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowEventNotifier.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.core.impl;
-
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.workflow.event.EventListenerManagerFactory;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
-import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
-
-/**
- * Workflow event notifier
- */
-public class WorkflowEventNotifier {
-
-    private final ProcessEventNotifier processEventNotifier;
-
-    private final TaskEventNotifier taskEventNotifier;
-
-    public WorkflowEventNotifier(EventListenerManagerFactory factory) {
-        WorkflowEventLogEntityMapper eventLogMapper = 
factory.getEventLogMapper();
-        taskEventNotifier = new 
TaskEventNotifier(factory.getTaskListenerManager(), eventLogMapper);
-        processEventNotifier = new 
ProcessEventNotifier(factory.getProcessListenerManager(), eventLogMapper);
-    }
-
-    public ProcessEventNotifier getProcessEventNotifier() {
-        return processEventNotifier;
-    }
-
-    public TaskEventNotifier getTaskEventNotifier() {
-        return taskEventNotifier;
-    }
-
-}
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
index 60be006c5..55b6c16f6 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.workflow.core.impl;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
@@ -35,6 +36,7 @@ import 
org.apache.inlong.manager.common.pojo.workflow.TaskQuery;
 import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverQuery;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowBriefDTO;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.dao.entity.WorkflowApproverEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -49,7 +51,6 @@ import 
org.apache.inlong.manager.workflow.core.WorkflowQueryService;
 import org.apache.inlong.manager.workflow.definition.Element;
 import org.apache.inlong.manager.workflow.definition.NextableElement;
 import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.workflow.definition.UserTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -81,6 +82,8 @@ public class WorkflowQueryServiceImpl implements 
WorkflowQueryService {
     private WorkflowEventLogEntityMapper eventLogMapper;
     @Autowired
     private WorkflowApproverEntityMapper approverMapper;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     @Override
     public WorkflowProcessEntity getProcessEntity(Integer processId) {
@@ -240,7 +243,7 @@ public class WorkflowQueryServiceImpl implements 
WorkflowQueryService {
         elementDTO.setName(startEvent.getName());
         elementDTO.setDisplayName(startEvent.getDisplayName());
 
-        WorkflowContext context = WorkflowBeanUtils.buildContext(process, 
processEntity);
+        WorkflowContext context = WorkflowBeanUtils.buildContext(objectMapper, 
process, processEntity);
         addNext(startEvent, elementDTO, context, nameStatusMap);
 
         WorkflowBriefDTO briefDTO = new WorkflowBriefDTO();
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index c66a8a7e3..91c1060bf 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -20,13 +20,15 @@ package org.apache.inlong.manager.workflow.definition;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.EventListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.springframework.util.CollectionUtils;
 
@@ -43,6 +45,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ServiceTask extends WorkflowTask {
 
+    public static final Gson GSON = new GsonBuilder().create();
+
     private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
             .of(WorkflowAction.COMPLETE, WorkflowAction.CANCEL, 
WorkflowAction.TERMINATE);
 
@@ -101,7 +105,7 @@ public class ServiceTask extends WorkflowTask {
                     try {
                         return (ConditionNextElement) ele.clone();
                     } catch (CloneNotSupportedException e) {
-                        e.printStackTrace();
+                        log.error("clone service task error: ", e);
                     }
                     return null;
                 }).collect(Collectors.toList())));
@@ -124,9 +128,9 @@ public class ServiceTask extends WorkflowTask {
             }
             List<TaskEventListener> listeners = Lists.newArrayList(
                     listenerProvider.get(workflowContext, serviceTaskType));
-            log.info("ServiceTask:{} is init for listeners:{}", getName(),
-                    JsonUtils.toJson(listeners.stream().map(listener -> 
listener.name()).collect(
-                            Collectors.toList())));
+
+            List<String> listenerNames = 
listeners.stream().map(EventListener::name).collect(Collectors.toList());
+            log.info("ServiceTask:{} is init for listeners:{}", getName(), 
GSON.toJson(listenerNames));
             addListeners(listeners);
         } else {
             log.info("ServiceTask:{} is already init", getName());
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventListenerManagerFactory.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventListenerManagerFactory.java
deleted file mode 100644
index 84b69d402..000000000
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventListenerManagerFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.event;
-
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.workflow.WorkflowConfig;
-import 
org.apache.inlong.manager.workflow.event.process.ProcessEventListenerManager;
-import org.apache.inlong.manager.workflow.event.task.TaskEventListenerManager;
-
-/**
- * Factory of workflow event listener manager
- */
-public class EventListenerManagerFactory {
-
-    private final ProcessEventListenerManager processListenerManager;
-    private final TaskEventListenerManager taskListenerManager;
-    private final WorkflowEventLogEntityMapper eventLogMapper;
-
-    public EventListenerManagerFactory(WorkflowConfig workflowConfig) {
-        this.processListenerManager = new ProcessEventListenerManager();
-        this.eventLogMapper = workflowConfig.getEventLogMapper();
-        this.taskListenerManager = new 
TaskEventListenerManager(eventLogMapper);
-    }
-
-    /**
-     * Get process event listener manager.
-     */
-    public ProcessEventListenerManager getProcessListenerManager() {
-        return processListenerManager;
-    }
-
-    /**
-     * Get task event listener manager.
-     */
-    public TaskEventListenerManager getTaskListenerManager() {
-        return taskListenerManager;
-    }
-
-    /**
-     * Get workflow event listener manager.
-     */
-    public WorkflowEventLogEntityMapper getEventLogMapper() {
-        return eventLogMapper;
-    }
-
-}
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
index cf42e04f4..105543049 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/LogableEventListener.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.workflow.event;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EventStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.NetworkUtils;
 import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -63,10 +62,10 @@ public abstract class LogableEventListener<EventType 
extends WorkflowEvent> impl
         WorkflowEventLogEntity workflowEventLogEntity = buildEventLog(context);
         try {
             ListenerResult result = eventListener.listen(context);
-            log.debug("listener execute result:{} - {}", 
workflowEventLogEntity, result);
+            log.debug("listener execute result: {} - {}", 
workflowEventLogEntity, result);
             return result;
         } catch (Exception e) {
-            log.error("listener exception:{}", workflowEventLogEntity, e);
+            log.error("execute listener {} error: {}", workflowEventLogEntity, 
e);
             if (!async()) {
                 throw new WorkflowListenerException(e);
             }
@@ -87,7 +86,7 @@ public abstract class LogableEventListener<EventType extends 
WorkflowEvent> impl
         } catch (Exception e) {
             logEntity.setStatus(EventStatus.FAILED.getStatus());
             logEntity.setException(e.getMessage());
-            log.error("listener exception:{}", JsonUtils.toJson(logEntity), e);
+            log.error("execute listener {} error: {}", logEntity, e);
             if (!async()) {
                 throw new WorkflowListenerException(e.getMessage());
             }
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
index ae519911b..ec567362e 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListenerManager.java
@@ -22,6 +22,8 @@ import com.google.common.collect.Maps;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
 import org.apache.inlong.manager.workflow.event.EventListenerManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@ import java.util.Map;
 /**
  * System default process event listener manager
  */
+@Service
 public class ProcessEventListenerManager implements 
EventListenerManager<ProcessEvent, ProcessEventListener> {
 
     private static final List<ProcessEventListener> EMPTY = 
Lists.newArrayList();
@@ -36,6 +39,7 @@ public class ProcessEventListenerManager implements 
EventListenerManager<Process
     private final Map<ProcessEvent, List<ProcessEventListener>> 
asyncProcessEventListeners = Maps.newHashMap();
     private final Map<String, ProcessEventListener> processEventListeners = 
Maps.newHashMap();
 
+    @Autowired
     private WorkflowEventLogEntityMapper eventLogMapper;
 
     @Override
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
index e4050c8b1..ac9cd10ed 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
@@ -25,6 +25,8 @@ import 
org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.event.EventListenerManager;
 import org.apache.inlong.manager.workflow.event.EventListenerNotifier;
 import org.apache.inlong.manager.workflow.event.LogableEventListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
@@ -38,6 +40,7 @@ import java.util.function.Consumer;
  * WorkflowProcess event notifier
  */
 @Slf4j
+@Service
 public class ProcessEventNotifier implements 
EventListenerNotifier<ProcessEvent> {
 
     private final ExecutorService executorService = new ThreadPoolExecutor(
@@ -49,13 +52,10 @@ public class ProcessEventNotifier implements 
EventListenerNotifier<ProcessEvent>
             new 
ThreadFactoryBuilder().setNameFormat("async-process-event-notifier-%s").build(),
             new CallerRunsPolicy());
 
-    private final EventListenerManager<ProcessEvent, ProcessEventListener> 
eventListenerManager;
-    private final WorkflowEventLogEntityMapper eventLogMapper;
-
-    public ProcessEventNotifier(ProcessEventListenerManager manager, 
WorkflowEventLogEntityMapper eventLogMapper) {
-        this.eventListenerManager = manager;
-        this.eventLogMapper = eventLogMapper;
-    }
+    @Autowired
+    private EventListenerManager<ProcessEvent, ProcessEventListener> 
eventListenerManager;
+    @Autowired
+    private WorkflowEventLogEntityMapper eventLogMapper;
 
     @Override
     public void notify(ProcessEvent event, WorkflowContext context) {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
index 8408861fe..0373cc350 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListenerManager.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Maps;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
 import org.apache.inlong.manager.workflow.event.EventListenerManager;
+import org.springframework.stereotype.Service;
 
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.Map;
 /**
  * Internal default task listener management
  */
+@Service
 public class TaskEventListenerManager implements 
EventListenerManager<TaskEvent, TaskEventListener> {
 
     private final Map<TaskEvent, List<TaskEventListener>> 
syncTaskEventListeners = Maps.newHashMap();
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
index 6a3144f2c..3382203c4 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
@@ -25,6 +25,8 @@ import 
org.apache.inlong.manager.workflow.definition.WorkflowTask;
 import org.apache.inlong.manager.workflow.event.EventListenerManager;
 import org.apache.inlong.manager.workflow.event.EventListenerNotifier;
 import org.apache.inlong.manager.workflow.event.LogableEventListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
@@ -38,6 +40,7 @@ import java.util.function.Consumer;
  * WorkflowProcess event notifier
  */
 @Slf4j
+@Service
 public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
 
     private final ExecutorService executorService = new ThreadPoolExecutor(
@@ -49,8 +52,10 @@ public class TaskEventNotifier implements 
EventListenerNotifier<TaskEvent> {
             new 
ThreadFactoryBuilder().setNameFormat("async-task-event-notifier-%s").build(),
             new CallerRunsPolicy());
 
-    private final EventListenerManager<TaskEvent, TaskEventListener> 
eventListenerManager;
-    private final WorkflowEventLogEntityMapper eventLogMapper;
+    @Autowired
+    private EventListenerManager<TaskEvent, TaskEventListener> 
eventListenerManager;
+    @Autowired
+    private WorkflowEventLogEntityMapper eventLogMapper;
 
     public TaskEventNotifier(TaskEventListenerManager listenerManager, 
WorkflowEventLogEntityMapper eventLogMapper) {
         this.eventListenerManager = listenerManager;
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
index c79a1fe32..ebfd091fc 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractNextableElementProcessor.java
@@ -32,8 +32,7 @@ import java.util.stream.Collectors;
 /**
  * Non-terminal element processor
  */
-public abstract class AbstractNextableElementProcessor<T extends 
NextableElement> implements
-        ElementProcessor<T> {
+public abstract class AbstractNextableElementProcessor<T extends 
NextableElement> implements ElementProcessor<T> {
 
     @Override
     public List<Element> next(T element, WorkflowContext context) {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
index 64229889d..77051a026 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/AbstractTaskProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.ApproverAssign;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Date;
 
@@ -35,12 +36,9 @@ import java.util.Date;
 public abstract class AbstractTaskProcessor<T extends WorkflowTask> extends
         AbstractNextableElementProcessor<T> implements 
SkipableElementProcessor<T> {
 
+    @Autowired
     protected WorkflowTaskEntityMapper taskEntityMapper;
 
-    protected AbstractTaskProcessor(WorkflowTaskEntityMapper taskEntityMapper) 
{
-        this.taskEntityMapper = taskEntityMapper;
-    }
-
     @Override
     public void skip(T task, WorkflowContext context) {
         WorkflowProcessEntity workflowProcessEntity = 
context.getProcessEntity();
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
index 60c5605cf..9e22967a2 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/EndEventProcessor.java
@@ -27,11 +27,12 @@ import 
org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
 import org.apache.inlong.manager.workflow.definition.Element;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
 import java.util.Collections;
@@ -42,19 +43,15 @@ import java.util.List;
  * End event handler
  */
 @Slf4j
+@Service
 public class EndEventProcessor implements ElementProcessor<EndEvent> {
 
-    private final WorkflowProcessEntityMapper processEntityMapper;
-    private final WorkflowTaskEntityMapper taskEntityMapper;
-    private final ProcessEventNotifier processEventNotifier;
-
-    public EndEventProcessor(WorkflowProcessEntityMapper processEntityMapper,
-            WorkflowTaskEntityMapper taskEntityMapper,
-            WorkflowEventNotifier eventNotifier) {
-        this.processEntityMapper = processEntityMapper;
-        this.taskEntityMapper = taskEntityMapper;
-        this.processEventNotifier = eventNotifier.getProcessEventNotifier();
-    }
+    @Autowired
+    private ProcessEventNotifier processEventNotifier;
+    @Autowired
+    private WorkflowTaskEntityMapper taskEntityMapper;
+    @Autowired
+    private WorkflowProcessEntityMapper processEntityMapper;
 
     @Override
     public Class<EndEvent> watch() {
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index fe8a02d7d..c779ba04d 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -17,17 +17,19 @@
 
 package org.apache.inlong.manager.workflow.processor;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
+import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.TaskStatus;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.exceptions.JsonException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
+import org.apache.inlong.manager.workflow.WorkflowContext.ActionContext;
 import org.apache.inlong.manager.workflow.definition.ApproverAssign;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -35,6 +37,8 @@ import 
org.apache.inlong.manager.workflow.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.Date;
 import java.util.List;
@@ -43,6 +47,8 @@ import java.util.Set;
 /**
  * System task processor
  */
+@Service
+@NoArgsConstructor
 public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
 
     private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
@@ -53,14 +59,14 @@ public class ServiceTaskProcessor extends 
AbstractTaskProcessor<ServiceTask> {
             TaskStatus.PENDING, TaskStatus.FAILED
     );
 
-    private final TaskEventNotifier taskEventNotifier;
-    private final ProcessEventNotifier processEventNotifier;
-
-    public ServiceTaskProcessor(WorkflowTaskEntityMapper taskEntityMapper, 
WorkflowEventNotifier eventNotifier) {
-        super(taskEntityMapper);
-        this.taskEventNotifier = eventNotifier.getTaskEventNotifier();
-        this.processEventNotifier = eventNotifier.getProcessEventNotifier();
-    }
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private WorkflowTaskEntityMapper taskEntityMapper;
+    @Autowired
+    private TaskEventNotifier taskEventNotifier;
+    @Autowired
+    private ProcessEventNotifier processEventNotifier;
 
     @Override
     public Class<ServiceTask> watch() {
@@ -128,12 +134,15 @@ public class ServiceTaskProcessor extends 
AbstractTaskProcessor<ServiceTask> {
         return taskEntity;
     }
 
-    private void completeTaskEntity(WorkflowContext.ActionContext 
actionContext, WorkflowTaskEntity taskEntity,
-            TaskStatus taskStatus) {
+    private void completeTaskEntity(ActionContext actionContext, 
WorkflowTaskEntity taskEntity, TaskStatus taskStatus) {
         taskEntity.setStatus(taskStatus.name());
         taskEntity.setOperator(taskEntity.getApprovers());
         taskEntity.setRemark(actionContext.getRemark());
-        taskEntity.setFormData(JsonUtils.toJson(actionContext.getForm()));
+        try {
+            
taskEntity.setFormData(objectMapper.writeValueAsString(actionContext.getForm()));
+        } catch (Exception e) {
+            throw new JsonException("write form to json error: ", e);
+        }
         taskEntity.setEndTime(new Date());
         taskEntityMapper.update(taskEntity);
     }
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index 20fc2ea18..7619c2c77 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -17,35 +17,36 @@
 
 package org.apache.inlong.manager.workflow.processor;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.workflow.definition.StartEvent;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventNotifier;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.Date;
 
 /**
  * Start event handler
  */
+@Service
 public class StartEventProcessor extends 
AbstractNextableElementProcessor<StartEvent> {
 
-    private final WorkflowProcessEntityMapper processEntityMapper;
-
-    private final ProcessEventNotifier processEventNotifier;
-
-    public StartEventProcessor(WorkflowProcessEntityMapper 
processEntityMapper, WorkflowEventNotifier eventNotifier) {
-        this.processEntityMapper = processEntityMapper;
-        this.processEventNotifier = eventNotifier.getProcessEventNotifier();
-    }
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private ProcessEventNotifier processEventNotifier;
+    @Autowired
+    private WorkflowProcessEntityMapper processEntityMapper;
 
     @Override
     public Class<StartEvent> watch() {
@@ -90,7 +91,11 @@ public class StartEventProcessor extends 
AbstractNextableElementProcessor<StartE
         processEntity.setInlongGroupId(form.getInlongGroupId());
         processEntity.setApplicant(applicant);
         processEntity.setStatus(ProcessStatus.PROCESSING.name());
-        processEntity.setFormData(JsonUtils.toJson(form));
+        try {
+            processEntity.setFormData(objectMapper.writeValueAsString(form));
+        } catch (Exception e) {
+            throw new JsonException("write form to json error: ", e);
+        }
         processEntity.setStartTime(new Date());
         processEntity.setHidden(process.getHidden());
 
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
index ec64c5611..6fe6ba2b5 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
@@ -17,51 +17,54 @@
 
 package org.apache.inlong.manager.workflow.processor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.exceptions.JsonException;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.core.impl.WorkflowEventNotifier;
 import org.apache.inlong.manager.workflow.definition.Element;
 import org.apache.inlong.manager.workflow.definition.UserTask;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.event.task.TaskEventNotifier;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 /**
  * User task processor
  */
+@Slf4j
+@Service
 public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
 
     private static final Set<WorkflowAction> SHOULD_CHECK_OPERATOR_ACTIONS = 
ImmutableSet
             .of(WorkflowAction.APPROVE, WorkflowAction.REJECT, 
WorkflowAction.TRANSFER);
-
     private static final Set<WorkflowAction> SUPPORT_ACTIONS = ImmutableSet.of(
             WorkflowAction.APPROVE, WorkflowAction.REJECT, 
WorkflowAction.TRANSFER, WorkflowAction.CANCEL,
             WorkflowAction.TERMINATE
     );
-    private final TaskEventNotifier taskEventNotifier;
 
-    public UserTaskProcessor(WorkflowTaskEntityMapper taskEntityMapper, 
WorkflowEventNotifier eventNotifier) {
-        super(taskEntityMapper);
-        this.taskEventNotifier = eventNotifier.getTaskEventNotifier();
-    }
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private TaskEventNotifier taskEventNotifier;
 
     @Override
     public Class<UserTask> watch() {
@@ -170,17 +173,33 @@ public class UserTaskProcessor extends 
AbstractTaskProcessor<UserTask> {
         taskEntity.setRemark(actionContext.getRemark());
 
         UserTask userTask = (UserTask) actionContext.getTask();
-        if (needForm(userTask, actionContext.getAction())) {
-            Preconditions.checkNotNull(actionContext.getForm(), "form cannot 
be null");
-            
Preconditions.checkTrue(actionContext.getForm().getClass().isAssignableFrom(userTask.getFormClass()),
-                    "form type not match, should be class " + 
userTask.getFormClass());
-            actionContext.getForm().validate();
-            taskEntity.setFormData(JsonUtils.toJson(actionContext.getForm()));
-        } else {
-            Preconditions.checkNull(actionContext.getForm(), "no form 
required");
+        try {
+            TaskForm taskForm = actionContext.getForm();
+            if (needForm(userTask, actionContext.getAction())) {
+                Preconditions.checkNotNull(taskForm, "form cannot be null");
+                
Preconditions.checkTrue(taskForm.getClass().isAssignableFrom(userTask.getFormClass()),
+                        "form type not match, should be class " + 
userTask.getFormClass());
+                taskForm.validate();
+                
taskEntity.setFormData(objectMapper.writeValueAsString(taskForm));
+            } else {
+                Preconditions.checkNull(taskForm, "no form required");
+            }
+            taskEntity.setEndTime(new Date());
+
+            Map<String, Object> extMap = new HashMap<>();
+            if (StringUtils.isNotBlank(taskEntity.getExtParams())) {
+                extMap = objectMapper.readValue(taskEntity.getExtParams(),
+                        
objectMapper.getTypeFactory().constructMapType(Map.class, String.class, 
Object.class));
+                if (WorkflowAction.TRANSFER.equals(actionContext.getAction())) 
{
+                    extMap.put(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY, 
actionContext.getTransferToUsers());
+                }
+            }
+            String extParams = objectMapper.writeValueAsString(extMap);
+            taskEntity.setExtParams(extParams);
+        } catch (JsonProcessingException e) {
+            log.error("parse transfer users error: ", e);
+            throw new JsonException("parse transfer users error");
         }
-        taskEntity.setEndTime(new Date());
-        taskEntity.setExtParams(handlerExt(actionContext, 
taskEntity.getExtParams()));
         taskEntityMapper.update(taskEntity);
     }
 
@@ -192,18 +211,6 @@ public class UserTaskProcessor extends 
AbstractTaskProcessor<UserTask> {
         return WorkflowAction.APPROVE.equals(workflowAction) || 
WorkflowAction.COMPLETE.equals(workflowAction);
     }
 
-    private String handlerExt(WorkflowContext.ActionContext actionContext, 
String oldExt) {
-        Map<String, Object> extMap = Optional.ofNullable(oldExt)
-                .map(e -> JsonUtils.parseMap(oldExt, String.class, 
Object.class))
-                .orElseGet(Maps::newHashMap);
-
-        if (WorkflowAction.TRANSFER.equals(actionContext.getAction())) {
-            extMap.put(WorkflowTaskEntity.EXT_TRANSFER_USER_KEY, 
actionContext.getTransferToUsers());
-        }
-
-        return JsonUtils.toJson(extMap);
-    }
-
     private TaskStatus toTaskState(WorkflowAction workflowAction) {
         switch (workflowAction) {
             case APPROVE:
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
index cebf3f2a5..c89475860 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowBeanUtils.java
@@ -17,16 +17,20 @@
 
 package org.apache.inlong.manager.workflow.util;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.exceptions.JsonException;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,15 +44,17 @@ import java.util.stream.Collectors;
  */
 public class WorkflowBeanUtils {
 
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkflowBeanUtils.class);
 
     /**
      * Build workflow context from WorkflowProcess and WorkflowProcessEntity
      */
-    public static WorkflowContext buildContext(WorkflowProcess process, 
WorkflowProcessEntity processEntity) {
+    public static WorkflowContext buildContext(ObjectMapper objectMapper, 
WorkflowProcess process,
+            WorkflowProcessEntity processEntity) {
         ProcessForm processForm = null;
         try {
-            processForm = 
WorkflowFormParserUtils.parseProcessForm(processEntity.getFormData(), process);
+            processForm = 
WorkflowFormParserUtils.parseProcessForm(objectMapper, 
processEntity.getFormData(), process);
         } catch (Exception e) {
             LOGGER.error("build context from process form failed with id: {}", 
processEntity.getId(), e);
         }
@@ -91,7 +97,8 @@ public class WorkflowBeanUtils {
         if (entity == null) {
             return null;
         }
-        return ProcessResponse.builder()
+
+        ProcessResponse processResponse = ProcessResponse.builder()
                 .id(entity.getId())
                 .name(entity.getName())
                 .displayName(entity.getDisplayName())
@@ -101,9 +108,26 @@ public class WorkflowBeanUtils {
                 .status(ProcessStatus.valueOf(entity.getStatus()))
                 .startTime(entity.getStartTime())
                 .endTime(entity.getEndTime())
-                .formData(JsonUtils.parse(entity.getFormData()))
-                .extParams(JsonUtils.parse(entity.getExtParams()))
                 .build();
+
+        try {
+            JsonNode formData = null;
+            if (StringUtils.isNotBlank(entity.getFormData())) {
+                formData = OBJECT_MAPPER.readTree(entity.getFormData());
+            }
+            processResponse.setFormData(formData);
+
+            JsonNode extParams = null;
+            if (StringUtils.isNotBlank(entity.getExtParams())) {
+                extParams = OBJECT_MAPPER.readTree(entity.getExtParams());
+            }
+            processResponse.setExtParams(extParams);
+        } catch (JsonProcessingException e) {
+            LOGGER.error("parse form data error: ", e);
+            throw new JsonException("parse form data or ext params error");
+        }
+
+        return processResponse;
     }
 
     /**
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
index 49d6b13a4..5853c47c8 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
@@ -18,14 +18,14 @@
 package org.apache.inlong.manager.workflow.util;
 
 import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.exceptions.FormParseException;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.WorkflowTaskEntity;
 import org.apache.inlong.manager.workflow.definition.UserTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
@@ -39,8 +39,8 @@ public class WorkflowFormParserUtils {
     /**
      * Parse the task form in JSON string format into a WorkflowTask instance
      */
-    public static <T extends TaskForm> T parseTaskForm(WorkflowTaskEntity 
workflowTaskEntity, WorkflowProcess process)
-            throws FormParseException {
+    public static <T extends TaskForm> T parseTaskForm(ObjectMapper 
objectMapper,
+            WorkflowTaskEntity workflowTaskEntity, WorkflowProcess process) 
throws FormParseException {
         Preconditions.checkNotNull(workflowTaskEntity, "workflowTaskEntity 
cannot be null");
         Preconditions.checkNotNull(process, "process cannot be null");
 
@@ -54,10 +54,10 @@ public class WorkflowFormParserUtils {
 
         UserTask userTask = (UserTask) task;
         try {
-            JavaType javaType = 
JsonUtils.OBJECT_MAPPER.constructType(userTask.getFormClass());
-            return JsonUtils.parse(workflowTaskEntity.getFormData(), javaType);
+            JavaType javaType = 
objectMapper.constructType(userTask.getFormClass());
+            return objectMapper.readValue(workflowTaskEntity.getFormData(), 
javaType);
         } catch (Exception e) {
-            log.error("task form parse failed, form is: {}", 
workflowTaskEntity.getFormData(), e);
+            log.error("task parsed failed for form {}", 
workflowTaskEntity.getFormData(), e);
             throw new FormParseException("task form parse failed");
         }
     }
@@ -65,8 +65,8 @@ public class WorkflowFormParserUtils {
     /**
      * Parse the process form in JSON string format into a WorkflowProcess 
instance
      */
-    public static <T extends ProcessForm> T parseProcessForm(String form, 
WorkflowProcess process)
-            throws FormParseException {
+    public static <T extends ProcessForm> T parseProcessForm(ObjectMapper 
objectMapper, String form,
+            WorkflowProcess process) throws FormParseException {
         Preconditions.checkNotNull(process, "process cannot be null");
 
         if (StringUtils.isEmpty(form)) {
@@ -74,8 +74,8 @@ public class WorkflowFormParserUtils {
         }
 
         try {
-            JavaType javaType = 
JsonUtils.OBJECT_MAPPER.constructType(process.getFormClass());
-            return JsonUtils.parse(form, javaType);
+            JavaType javaType = 
objectMapper.constructType(process.getFormClass());
+            return objectMapper.readValue(form, javaType);
         } catch (Exception e) {
             log.error("process form parse failed, form is: {}", form, e);
             throw new FormParseException("process form parse failed");

Reply via email to