This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 0ba3c5e5d [ISSUE #5075] update eventmesh-admin-server create task
response (#5076)
0ba3c5e5d is described below
commit 0ba3c5e5d363620b0625d146e4cb73a67fcd1473
Author: mike_xwm <[email protected]>
AuthorDate: Thu Aug 8 19:18:02 2024 +0800
[ISSUE #5075] update eventmesh-admin-server create task response (#5076)
* [ISSUE #5073] Fix eventmesh-admin-server createTask response error
* udpate eventmesh.sql
* [ISSUE #5075] update eventmesh-admin-server create task response #5075
---
.../eventmesh/admin/server/web/HttpServer.java | 5 +-
.../server/web/service/task/TaskBizService.java | 80 ++++++++++++++++------
.../common/remote/request/CreateTaskRequest.java | 2 +
.../common/remote/response/CreateTaskResponse.java | 12 ++++
.../apache/eventmesh/common/utils/JsonUtils.java | 4 ++
5 files changed, 79 insertions(+), 24 deletions(-)
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index b79ac5ae8..12afb3a3d 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.admin.server.web;
import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,8 +38,8 @@ public class HttpServer {
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody
CreateTaskRequest task) {
- String uuid = taskService.createTask(task);
- return
ResponseEntity.ok(JsonUtils.toJSONString(Response.success(uuid)));
+ CreateTaskResponse createTaskResponse = taskService.createTask(task);
+ return
ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
}
public boolean deleteTask(Long id) {
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
index 7089f9cf7..7bc16ba4a 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
@@ -18,6 +18,8 @@
package org.apache.eventmesh.admin.server.web.service.task;
import org.apache.eventmesh.admin.server.AdminServerProperties;
+import org.apache.eventmesh.admin.server.web.Response;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo;
import
org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
@@ -27,10 +29,12 @@ import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -55,8 +59,18 @@ public class TaskBizService {
@Autowired
private AdminServerProperties properties;
+ private static final String TYPE = "type";
+
+ private static final String DESC = "desc";
+
+ private static final String CONF_CLAZZ = "confClazz";
+
+ private static final String CONF = "conf";
+
+ private static final String REGION = "region";
+
@Transactional
- public String createTask(CreateTaskRequest req) {
+ public CreateTaskResponse createTask(CreateTaskRequest req) {
String taskID = req.getTaskId();
if (StringUtils.isEmpty(taskID)) {
taskID = UUID.randomUUID().toString();
@@ -64,8 +78,9 @@ public class TaskBizService {
}
String targetRegion = req.getTargetRegion();
+ String remoteResponse = "";
// not from other admin && target not equals with self region
- if (!req.isFlag() && !StringUtils.equals(properties.getRegion(),
targetRegion)) {
+ if (!req.isFlag() && !properties.getRegion().equals(targetRegion)) {
List<String> adminServerList =
properties.getAdminServerList().get(targetRegion);
if (adminServerList == null || adminServerList.isEmpty()) {
throw new RuntimeException("No admin server available for
region: " + targetRegion);
@@ -78,6 +93,7 @@ public class TaskBizService {
if (!response.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("Failed to create task on admin
server: " + targetUrl);
}
+ remoteResponse = response.getBody();
}
String finalTaskID = taskID;
@@ -93,7 +109,7 @@ public class TaskBizService {
job.setUpdateUid(req.getUid());
return job;
}).collect(Collectors.toList());
- jobInfoService.createJobs(jobs);
+
EventMeshTaskInfo taskInfo = new EventMeshTaskInfo();
taskInfo.setTaskID(finalTaskID);
taskInfo.setTaskName(req.getTaskName());
@@ -102,8 +118,9 @@ public class TaskBizService {
taskInfo.setCreateUid(req.getUid());
taskInfo.setSourceRegion(req.getSourceRegion());
taskInfo.setTargetRegion(req.getTargetRegion());
+ List<EventMeshJobInfo> eventMeshJobInfoList =
jobInfoService.createJobs(jobs);
taskInfoService.save(taskInfo);
- return finalTaskID;
+ return buildCreateTaskResponse(finalTaskID, eventMeshJobInfoList,
remoteResponse);
}
private JobDetail parse(CreateTaskRequest.JobDetail src) throws
ClassNotFoundException {
@@ -111,29 +128,48 @@ public class TaskBizService {
dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
- Map<String, Object> sourceDataMap = src.getSourceDataSource();
- DataSource sourceDataSource = new DataSource();
-
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
- sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
- sourceDataSource.setConfClazz((Class<? extends Config>)
Class.forName(sourceDataMap.get("confClazz").toString()));
-
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")),
sourceDataSource.getConfClazz()));
- sourceDataSource.setRegion((String) sourceDataMap.get("region"));
- dst.setSourceDataSource(sourceDataSource);
-
+ try {
+
dst.setSourceDataSource(mapToDataSource(src.getSourceDataSource()));
+ dst.setSinkDataSource(mapToDataSource(src.getSinkDataSource()));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to map data source", e);
+ }
dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
- Map<String, Object> sinkDataMap = src.getSinkDataSource();
- DataSource sinkDataSource = new DataSource();
-
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
- sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
- sinkDataSource.setConfClazz((Class<? extends Config>)
Class.forName(sinkDataMap.get("confClazz").toString()));
-
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")),
sinkDataSource.getConfClazz()));
- sinkDataSource.setRegion((String) sinkDataMap.get("region"));
- dst.setSinkDataSource(sinkDataSource);
-
// full/increase/check
dst.setJobType(src.getJobType());
dst.setFromRegion(src.getFromRegion());
dst.setRunningRegion(src.getRunningRegion());
return dst;
}
+
+ private DataSource mapToDataSource(Map<String, Object> dataMap) throws
ClassNotFoundException {
+ DataSource dataSource = new DataSource();
+
dataSource.setType(DataSourceType.fromString(dataMap.get(TYPE).toString()));
+ dataSource.setDesc((String) dataMap.get(DESC));
+ dataSource.setConfClazz((Class<? extends Config>)
Class.forName(dataMap.get(CONF_CLAZZ).toString()));
+
dataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(dataMap.get(CONF)),
dataSource.getConfClazz()));
+ dataSource.setRegion((String) dataMap.get(REGION));
+ return dataSource;
+ }
+
+ private CreateTaskResponse buildCreateTaskResponse(String taskId,
List<EventMeshJobInfo> eventMeshJobInfoList, String remoteResponse) {
+ CreateTaskResponse createTaskResponse = new CreateTaskResponse();
+ createTaskResponse.setTaskId(taskId);
+ List<CreateTaskRequest.JobDetail> jobDetailList = new ArrayList<>();
+ if (!eventMeshJobInfoList.isEmpty()) {
+ for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfoList) {
+ CreateTaskRequest.JobDetail jobDetail = new
CreateTaskRequest.JobDetail();
+ jobDetail.setJobId(eventMeshJobInfo.getJobID());
+
jobDetail.setRunningRegion(eventMeshJobInfo.getRunningRegion());
+ jobDetailList.add(jobDetail);
+ }
+ }
+ if (!StringUtils.isEmpty(remoteResponse)) {
+ Response response = JsonUtils.parseObject(remoteResponse,
Response.class);
+ CreateTaskResponse remoteCreateTaskResponse =
JsonUtils.convertValue(response.getData(), CreateTaskResponse.class);
+ jobDetailList.addAll(remoteCreateTaskResponse.getJobIdList());
+ }
+ createTaskResponse.setJobIdList(jobDetailList);
+ return createTaskResponse;
+ }
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
index c895b5c44..b09a3e10e 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
@@ -56,6 +56,8 @@ public class CreateTaskRequest {
@Data
public static class JobDetail {
+ private String jobId;
+
private String jobDesc;
// full/increase/check
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java
index a6f5628d6..11678dfcf 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java
@@ -17,5 +17,17 @@
package org.apache.eventmesh.common.remote.response;
+import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+
+import java.util.List;
+
+import lombok.Data;
+
+@Data
public class CreateTaskResponse extends BaseRemoteResponse {
+
+ private String taskId;
+
+ private List<CreateTaskRequest.JobDetail> jobIdList;
+
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
index bf9195703..9e9cea304 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -54,6 +54,10 @@ public class JsonUtils {
OBJECT_MAPPER.registerModule(new JavaTimeModule());
}
+ public static <T> T convertValue(Object fromValue, Class<T> toValueType) {
+ return OBJECT_MAPPER.convertValue(fromValue, toValueType);
+ }
+
public static <T> T mapToObject(Map<String, Object> map, Class<T>
beanClass) {
if (map == null) {
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]