This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 771a18940 [ISSUE #5067] Enhancement for eventmesh-admin-server (#5068)
771a18940 is described below
commit 771a18940498f6dc55f25564f7cf83cd4618204b
Author: mike_xwm <[email protected]>
AuthorDate: Sun Aug 4 22:16:19 2024 +0800
[ISSUE #5067] Enhancement for eventmesh-admin-server (#5068)
* [ISSUE #5040] Support gtid mode for sync data with mysql
* fix conflicts with master
* fix checkstyle error
* [ISSUE #5044] Data synchronization strong verification in mariadb gtid
mode
* fix checkstyle error
* [ISSUE #5048] Add report verify request to admin for connector runtime
* fix checkstyle error
* [ISSUE #5052] Enhancement for source\sink connector
* fix checkstyle error
* fix checkstyle error
* [ISSUE #5067] Enhancement for eventmesh-admin-server
---
eventmesh-admin-server/conf/application.yaml | 10 ++-
eventmesh-admin-server/conf/eventmesh.sql | 14 ++--
.../conf/mapper/EventMeshJobInfoMapper.xml | 40 +++++-----
.../conf/mapper/EventMeshTaskInfoMapper.xml | 13 ++--
.../admin/server/AdminServerProperties.java | 5 ++
.../eventmesh/admin/server/web/HttpServer.java | 9 ++-
.../server/web/db/entity/EventMeshJobInfo.java | 8 +-
.../server/web/db/entity/EventMeshTaskInfo.java | 10 ++-
.../web/db/mapper/EventMeshJobInfoExtMapper.java | 18 ++++-
.../service/impl/EventMeshVerifyServiceImpl.java | 31 ++++----
.../eventmesh/admin/server/web/pojo/JobDetail.java | 8 +-
.../server/web/service/job/JobInfoBizService.java | 30 +++++---
.../server/web/service/task/TaskBizService.java | 55 +++++++++++---
.../eventmesh/common/remote/TransportType.java | 6 +-
.../common/remote/datasource/DataSource.java | 25 ++++---
.../datasource/MySqlIncDataSourceSourceConf.java | 85 ----------------------
.../request/CreateOrUpdateDataSourceReq.java | 5 +-
.../common/remote/request/CreateTaskRequest.java | 33 ++++++++-
.../eventmesh/runtime/RuntimeInstanceConfig.java | 4 +-
.../eventmesh/runtime/boot/RuntimeInstance.java | 44 ++++++-----
.../runtime/connector/ConnectorRuntime.java | 2 +-
.../src/main/resources/runtime.yaml | 2 +
22 files changed, 249 insertions(+), 208 deletions(-)
diff --git a/eventmesh-admin-server/conf/application.yaml
b/eventmesh-admin-server/conf/application.yaml
index 54795057c..afbcd4a43 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -28,5 +28,11 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
event-mesh:
admin-server:
- service-name: DEFAULT_GROUP@@em_adm_server
- port: 8081
\ No newline at end of file
+ serviceName: DEFAULT_GROUP@@em_adm_server
+ port: 8081
+ adminServerList:
+ region1:
+ - http://localhost:8081
+ region2:
+ - http://localhost:8082
+ region: region1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql
b/eventmesh-admin-server/conf/eventmesh.sql
index 82d5c5331..94edbb6fa 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -45,14 +45,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL,
- `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
+ `jobDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`sourceData` int NOT NULL DEFAULT '0',
`targetData` int NOT NULL DEFAULT '0',
- `state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL DEFAULT '',
+ `jobState` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL DEFAULT '',
`jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL DEFAULT '',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `runningRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -118,10 +119,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT
'taskstate',
- `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `taskName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL,
+ `taskDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
+ `taskState` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''
COMMENT 'taskstate',
+ `sourceRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `targetRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
index 02e880668..a053d1c83 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
@@ -19,31 +19,33 @@
-->
<!DOCTYPE mapper
- PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
- "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper">
<resultMap id="BaseResultMap"
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo">
- <id property="id" column="id" jdbcType="INTEGER"/>
- <result property="jobID" column="jobID" jdbcType="VARCHAR"/>
- <result property="desc" column="desc" jdbcType="VARCHAR"/>
- <result property="taskID" column="taskID" jdbcType="VARCHAR"/>
- <result property="transportType" column="transportType"
jdbcType="VARCHAR"/>
- <result property="sourceData" column="sourceData"
jdbcType="INTEGER"/>
- <result property="targetData" column="targetData"
jdbcType="INTEGER"/>
- <result property="state" column="state" jdbcType="VARCHAR"/>
- <result property="jobType" column="jobType" jdbcType="VARCHAR"/>
- <result property="fromRegion" column="fromRegion"
jdbcType="VARCHAR"/>
- <result property="createUid" column="createUid"
jdbcType="VARCHAR"/>
- <result property="updateUid" column="updateUid"
jdbcType="VARCHAR"/>
- <result property="createTime" column="createTime"
jdbcType="TIMESTAMP"/>
- <result property="updateTime" column="updateTime"
jdbcType="TIMESTAMP"/>
+ <id property="id" column="id" jdbcType="INTEGER"/>
+ <result property="jobID" column="jobID" jdbcType="VARCHAR"/>
+ <result property="jobDesc" column="desc" jdbcType="VARCHAR"/>
+ <result property="taskID" column="taskID" jdbcType="VARCHAR"/>
+ <result property="transportType" column="transportType"
jdbcType="VARCHAR"/>
+ <result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
+ <result property="targetData" column="targetData" jdbcType="INTEGER"/>
+ <result property="jobState" column="state" jdbcType="VARCHAR"/>
+ <result property="jobType" column="jobType" jdbcType="VARCHAR"/>
+ <result property="fromRegion" column="sourceRegion"
jdbcType="VARCHAR"/>
+ <result property="runningRegion" column="targetRegion"
jdbcType="VARCHAR"/>
+ <result property="createUid" column="createUid" jdbcType="VARCHAR"/>
+ <result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
+ <result property="createTime" column="createTime"
jdbcType="TIMESTAMP"/>
+ <result property="updateTime" column="updateTime"
jdbcType="TIMESTAMP"/>
</resultMap>
<sql id="Base_Column_List">
- id,jobID,desc,
+ id,jobID,jobDesc,
taskID,transportType,sourceData,
- targetData,state,jobType,
- fromRegion,createTime,updateTime
+ targetData,jobState,jobType,
+ fromRegion,runningRegion,createUid,
+ updateUid,createTime,updateTime
</sql>
</mapper>
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
index 05b1dc52a..c3514fd94 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
@@ -26,10 +26,11 @@
<resultMap id="BaseResultMap"
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
- <result property="name" column="name" jdbcType="VARCHAR"/>
- <result property="desc" column="desc" jdbcType="VARCHAR"/>
- <result property="state" column="state" jdbcType="VARCHAR"/>
- <result property="fromRegion" column="fromRegion"
jdbcType="VARCHAR"/>
+ <result property="taskName" column="taskName" jdbcType="VARCHAR"/>
+ <result property="taskDesc" column="taskDesc" jdbcType="VARCHAR"/>
+ <result property="taskState" column="taskState"
jdbcType="VARCHAR"/>
+ <result property="sourceRegion" column="sourceRegion"
jdbcType="VARCHAR"/>
+ <result property="targetRegion" column="targetRegion"
jdbcType="VARCHAR"/>
<result property="createUid" column="createUid"
jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid"
jdbcType="VARCHAR"/>
<result property="createTime" column="createTime"
jdbcType="TIMESTAMP"/>
@@ -37,8 +38,8 @@
</resultMap>
<sql id="Base_Column_List">
- id,taskID,name,
- desc,state,fromRegion,
+ id,taskID,taskName,
+ taskDesc,taskState,sourceRegion,targetRegion,
createUid,updateUid,createTime,
updateTime
</sql>
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
index 2162731e2..612d39807 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
@@ -17,6 +17,9 @@
package org.apache.eventmesh.admin.server;
+import java.util.List;
+import java.util.Map;
+
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Getter;
@@ -32,4 +35,6 @@ public class AdminServerProperties {
private String configurationPath;
private String configurationFile;
private String serviceName;
+ private Map<String, List<String>> adminServerList;
+ private String region;
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index bd896d546..a5daac881 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -24,18 +24,21 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import com.alibaba.druid.support.json.JSONUtils;
+
@RestController
@RequestMapping("/eventmesh/admin")
public class HttpServer {
@Autowired
private TaskBizService taskService;
- @RequestMapping("/createTask")
- public ResponseEntity<Response<String>> createOrUpdateTask(@RequestBody
CreateTaskRequest task) {
+ @RequestMapping(value = "/createTask", method = RequestMethod.POST)
+ public ResponseEntity<Object> createOrUpdateTask(@RequestBody
CreateTaskRequest task) {
String uuid = taskService.createTask(task);
- return ResponseEntity.ok(Response.success(uuid));
+ return
ResponseEntity.ok(JSONUtils.toJSONString(Response.success(uuid)));
}
public boolean deleteTask(Long id) {
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
index 23db5f6c2..a77eaaaca 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
@@ -37,7 +37,7 @@ public class EventMeshJobInfo implements Serializable {
private String jobID;
- private String desc;
+ private String jobDesc;
private String taskID;
@@ -47,12 +47,16 @@ public class EventMeshJobInfo implements Serializable {
private Integer targetData;
- private String state;
+ private String jobState;
private String jobType;
+ // job request from region
private String fromRegion;
+ // job actually running region
+ private String runningRegion;
+
private String createUid;
private String updateUid;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java
index 5d1b6648c..2d40f4a08 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java
@@ -37,13 +37,15 @@ public class EventMeshTaskInfo implements Serializable {
private String taskID;
- private String name;
+ private String taskName;
- private String desc;
+ private String taskDesc;
- private String state;
+ private String taskState;
- private String fromRegion;
+ private String sourceRegion;
+
+ private String targetRegion;
private String createUid;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java
index 7f46dcab4..c04c4e374 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java
@@ -21,11 +21,12 @@ import
org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
-import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.transaction.annotation.Transactional;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
@@ -33,9 +34,18 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/
@Mapper
public interface EventMeshJobInfoExtMapper extends
BaseMapper<EventMeshJobInfo> {
- @Insert("insert into event_mesh_job_info(`taskID`,`state`,`jobType`)
values"
- + "<foreach collection= 'jobs' item='job'
separator=','>(#{job.taskID},#{job.state},#{job.jobType})</foreach>")
- @Options(useGeneratedKeys = true, keyProperty = "jobID")
+
+ @Insert("<script>"
+ + "insert into event_mesh_job_info(jobID, jobDesc, taskID,
transportType, sourceData, "
+ + "targetData, jobState, jobType, fromRegion, runningRegion, "
+ + "createUid, updateUid) values"
+ + "<foreach collection= 'jobs' item='job' separator=','>"
+ + "(#{job.jobID}, #{job.jobDesc}, #{job.taskID}, #{job.transportType},
"
+ + "#{job.sourceData}, #{job.targetData}, #{job.jobState},
#{job.jobType}, "
+ + "#{job.fromRegion}, #{job.runningRegion}, #{job.createUid},
#{job.updateUid})"
+ + "</foreach>"
+ + "</script>")
+ @Transactional(rollbackFor = Exception.class)
int saveBatch(@Param("jobs") List<EventMeshJobInfo> jobInfoList);
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java
similarity index 58%
copy from
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
copy to
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java
index 4ecf9b452..5e49ba32e 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java
@@ -15,24 +15,25 @@
* limitations under the License.
*/
-package org.apache.eventmesh.common.remote.request;
+package org.apache.eventmesh.admin.server.web.db.service.impl;
-import org.apache.eventmesh.common.remote.datasource.DataSource;
-import org.apache.eventmesh.common.remote.datasource.DataSourceType;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify;
+import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshVerifyMapper;
+import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
+import org.springframework.stereotype.Service;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
/**
- * create or update datasource with custom data source config
+ * event_mesh_verify
*/
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest {
- private Integer id;
- private DataSourceType type;
- private String desc;
- private DataSource config;
- private String region;
- private String operator;
+@Service
+public class EventMeshVerifyServiceImpl extends
ServiceImpl<EventMeshVerifyMapper, EventMeshVerify>
+ implements EventMeshVerifyService {
+
}
+
+
+
+
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java
index c47b28448..0e2fa6487 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java
@@ -34,7 +34,7 @@ public class JobDetail {
private String jobID;
- private String desc;
+ private String jobDesc;
private String taskID;
@@ -50,7 +50,11 @@ public class JobDetail {
private String updateUid;
- private String region;
+ // job request from region
+ private String fromRegion;
+
+ // job actually running region
+ private String runningRegion;
private DataSource sourceDataSource;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 9affa10e6..ea0265848 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.admin.server.web.service.job;
+import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
@@ -70,12 +71,15 @@ public class JobInfoBizService {
@Autowired
private PositionBizService positionBizService;
+ @Autowired
+ private AdminServerProperties properties;
+
public boolean updateJobState(String jobID, TaskState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
- jobInfo.setState(state.name());
+ jobInfo.setJobState(state.name());
return jobInfoService.update(jobInfo,
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state",
TaskState.DELETE.name()));
}
@@ -86,34 +90,40 @@ public class JobInfoBizService {
return null;
}
List<EventMeshJobInfo> entityList = new LinkedList<>();
+
for (JobDetail job : jobs) {
+ // if running region not equal with admin region continue
+ if (!job.getRunningRegion().equals(properties.getRegion())) {
+ continue;
+ }
EventMeshJobInfo entity = new EventMeshJobInfo();
- entity.setState(TaskState.INIT.name());
+ entity.setJobState(TaskState.INIT.name());
entity.setTaskID(job.getTaskID());
entity.setJobType(job.getJobType().name());
- entity.setDesc(job.getDesc());
+ entity.setJobDesc(job.getJobDesc());
String jobID = UUID.randomUUID().toString();
entity.setJobID(jobID);
entity.setTransportType(job.getTransportType().name());
entity.setCreateUid(job.getCreateUid());
entity.setUpdateUid(job.getUpdateUid());
- entity.setFromRegion(job.getRegion());
+ entity.setFromRegion(job.getFromRegion());
+ entity.setRunningRegion(job.getRunningRegion());
CreateOrUpdateDataSourceReq source = new
CreateOrUpdateDataSourceReq();
source.setType(job.getTransportType().getSrc());
source.setOperator(job.getCreateUid());
- source.setRegion(job.getRegion());
+ source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
- source.setConfig(job.getSourceDataSource());
+ source.setConfig(job.getSourceDataSource().getConf());
EventMeshDataSource createdSource =
dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());
CreateOrUpdateDataSourceReq sink = new
CreateOrUpdateDataSourceReq();
sink.setType(job.getTransportType().getDst());
sink.setOperator(job.getCreateUid());
- sink.setRegion(job.getRegion());
+ sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
- sink.setConfig(job.getSinkDataSource());
- EventMeshDataSource createdSink =
dataSourceBizService.createDataSource(source);
+ sink.setConfig(job.getSinkDataSource().getConf());
+ EventMeshDataSource createdSink =
dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());
entityList.add(entity);
@@ -167,7 +177,7 @@ public class JobInfoBizService {
detail.setSinkConnectorDesc(target.getDescription());
}
- TaskState state = TaskState.fromIndex(job.getState());
+ TaskState state = TaskState.fromIndex(job.getJobState());
if (state == null) {
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,
"illegal job state in db");
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
index b4fdc57af..f68645613 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.admin.server.web.service.task;
+import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo;
import
org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
@@ -24,13 +25,18 @@ import
org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.List;
+import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.client.RestTemplate;
@Service
public class TaskBizService {
@@ -40,38 +46,67 @@ public class TaskBizService {
@Autowired
private JobInfoBizService jobInfoService;
+ @Autowired
+ private AdminServerProperties properties;
+
@Transactional
public String createTask(CreateTaskRequest req) {
- String taskID = UUID.randomUUID().toString();
+ String taskID = req.getTaskId();
+ if (StringUtils.isEmpty(taskID)) {
+ taskID = UUID.randomUUID().toString();
+ req.setTaskId(taskID);
+ }
+
+ String targetRegion = req.getTargetRegion();
+ // not from other admin && target not equals with self region
+ if (!req.isFlag() && !StringUtils.equals(properties.getRegion(),
targetRegion)) {
+ List<String> adminServerList =
properties.getAdminServerList().get(targetRegion);
+ if (adminServerList == null || adminServerList.isEmpty()) {
+ throw new RuntimeException("No admin server available for
region: " + targetRegion);
+ }
+ String targetUrl = adminServerList.get(new
Random().nextInt(adminServerList.size())) + "/eventmesh/admin/createTask";
+
+ RestTemplate restTemplate = new RestTemplate();
+ req.setFlag(true);
+ ResponseEntity<String> response =
restTemplate.postForEntity(targetUrl, req, String.class);
+ if (!response.getStatusCode().is2xxSuccessful()) {
+ throw new RuntimeException("Failed to create task on admin
server: " + targetUrl);
+ }
+ }
+
+ String finalTaskID = taskID;
List<JobDetail> jobs = req.getJobs().stream().map(x -> {
JobDetail job = parse(x);
- job.setTaskID(taskID);
- job.setRegion(req.getRegion());
+ job.setTaskID(finalTaskID);
job.setCreateUid(req.getUid());
job.setUpdateUid(req.getUid());
return job;
}).collect(Collectors.toList());
jobInfoService.createJobs(jobs);
EventMeshTaskInfo taskInfo = new EventMeshTaskInfo();
- taskInfo.setTaskID(taskID);
- taskInfo.setName(req.getName());
- taskInfo.setDesc(req.getDesc());
- taskInfo.setState(TaskState.INIT.name());
+ taskInfo.setTaskID(finalTaskID);
+ taskInfo.setTaskName(req.getTaskName());
+ taskInfo.setTaskDesc(req.getTaskDesc());
+ taskInfo.setTaskState(TaskState.INIT.name());
taskInfo.setCreateUid(req.getUid());
- taskInfo.setFromRegion(req.getRegion());
+ taskInfo.setSourceRegion(req.getSourceRegion());
+ taskInfo.setTargetRegion(req.getTargetRegion());
taskInfoService.save(taskInfo);
- return taskID;
+ return finalTaskID;
}
private JobDetail parse(CreateTaskRequest.JobDetail src) {
JobDetail dst = new JobDetail();
- dst.setDesc(src.getDesc());
+ dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
dst.setSourceDataSource(src.getSourceDataSource());
dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
dst.setSinkDataSource(src.getSinkDataSource());
+ // full/increase/check
dst.setJobType(src.getJobType());
+ dst.setFromRegion(src.getFromRegion());
+ dst.setRunningRegion(src.getRunningRegion());
return dst;
}
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
index 95a88a23f..82e7bc021 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
@@ -30,8 +30,12 @@ public enum TransportType {
REDIS_REDIS(DataSourceType.REDIS, DataSourceType.REDIS),
ROCKETMQ_ROCKETMQ(DataSourceType.ROCKETMQ, DataSourceType.ROCKETMQ),
MYSQL_HTTP(DataSourceType.MYSQL, DataSourceType.HTTP),
+ ROCKETMQ_HTTP(DataSourceType.ROCKETMQ, DataSourceType.HTTP),
HTTP_MYSQL(DataSourceType.HTTP, DataSourceType.MYSQL),
- REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ);
+ HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS),
+ HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ),
+ REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ),
+ ;
private static final Map<String, TransportType> INDEX_TYPES = new
HashMap<>();
private static final TransportType[] TYPES = TransportType.values();
private static final String SEPARATOR = "@";
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java
index 7af3812f2..afda98480 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java
@@ -17,27 +17,30 @@
package org.apache.eventmesh.common.remote.datasource;
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
+import
org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
+
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import lombok.Getter;
+import lombok.Data;
-@Getter
+@Data
public class DataSource {
- private final DataSourceType type;
+
+ private DataSourceType type;
+
private String desc;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
@JsonSubTypes({
- @JsonSubTypes.Type(value = MySqlIncDataSourceSourceConf.class, name =
"MySqlIncDataSourceSourceConf")
+ @JsonSubTypes.Type(value = CanalSourceConfig.class, name =
"CanalSourceConfig"),
+ @JsonSubTypes.Type(value = CanalSinkConfig.class, name =
"CanalSinkConfig")
})
- private final DataSourceConf conf;
- private final Class<? extends DataSourceConf> confClazz;
+ private Config conf;
- public DataSource(DataSourceType type, DataSourceConf conf) {
- this.type = type;
- this.conf = conf;
- this.confClazz = conf.getConfClass();
- }
+ private Class<? extends Config> confClazz;
+ private String region;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java
deleted file mode 100644
index f8c825e96..000000000
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.common.remote.datasource;
-
-import
org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig;
-import org.apache.eventmesh.common.remote.job.SyncConsistency;
-import org.apache.eventmesh.common.remote.job.SyncMode;
-import org.apache.eventmesh.common.remote.offset.RecordPosition;
-
-import java.util.List;
-
-public class MySqlIncDataSourceSourceConf extends DataSourceConf {
- @Override
- public Class<? extends DataSourceConf> getConfClass() {
- return MySqlIncDataSourceSourceConf.class;
- }
-
- private String destination;
-
- private Long canalInstanceId;
-
- private String desc;
-
- private boolean ddlSync = true;
-
- private boolean filterTableError = false;
-
- private Long slaveId;
-
- private Short clientId;
-
- private String serverUUID;
-
- private boolean isMariaDB = true;
-
- private boolean isGTIDMode = true;
-
- private Integer batchSize = 10000;
-
- private Long batchTimeout = -1L;
-
- private String tableFilter;
-
- private String fieldFilter;
-
- private List<RecordPosition> recordPositions;
-
- // ================================= channel parameter
- // ================================
-
- // enable remedy
- private Boolean enableRemedy = false;
-
- // sync mode: field/row
- private SyncMode syncMode;
-
- // sync consistency
- private SyncConsistency syncConsistency;
-
- // ================================= system parameter
- // ================================
-
- // Column name of the bidirectional synchronization mark
- private String needSyncMarkTableColumnName = "needSync";
-
- // Column value of the bidirectional synchronization mark
- private String needSyncMarkTableColumnValue = "needSync";
-
- private SourceConnectorConfig sourceConnectorConfig;
-}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
index 4ecf9b452..fadfa68e7 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
@@ -17,7 +17,7 @@
package org.apache.eventmesh.common.remote.request;
-import org.apache.eventmesh.common.remote.datasource.DataSource;
+import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import lombok.Data;
@@ -29,10 +29,11 @@ import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest {
+
private Integer id;
private DataSourceType type;
private String desc;
- private DataSource config;
+ private Config config;
private String region;
private String operator;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
index ce24e0341..47c45595a 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
@@ -30,16 +30,35 @@ import lombok.Data;
*/
@Data
public class CreateTaskRequest {
- private String name;
- private String desc;
+
+ private String taskId;
+
+ // task name
+ private String taskName;
+
+ // task description
+ private String taskDesc;
+
+ // task owner or updater
private String uid;
+
private List<JobDetail> jobs;
- private String region;
+
+ // task source region
+ private String sourceRegion;
+
+ // task target region
+ private String targetRegion;
+
+ // mark request send by other region admin, default is false
+ private boolean flag = false;
@Data
public static class JobDetail {
- private String desc;
+ private String jobDesc;
+
+ // full/increase/check
private JobType jobType;
private DataSource sourceDataSource;
@@ -51,5 +70,11 @@ public class CreateTaskRequest {
private String sinkConnectorDesc;
private TransportType transportType;
+
+ // job request from region
+ private String fromRegion;
+
+ // job actually running region
+ private String runningRegion;
}
}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
index 7171b3fc2..caa5330fe 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java
@@ -28,6 +28,8 @@ import lombok.NoArgsConstructor;
@Config(path = "classPath://runtime.yaml")
public class RuntimeInstanceConfig {
+ private boolean registryEnabled;
+
private String registryServerAddr;
private String registryPluginType;
@@ -36,7 +38,7 @@ public class RuntimeInstanceConfig {
private String adminServiceName;
- private String adminServerAddr;
+ private String adminServiceAddr;
private ComponentType componentType;
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
index 0fade897f..acea321e9 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
@@ -41,11 +41,11 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RuntimeInstance {
- private String adminServerAddr = "127.0.0.1:8081";
+ private String adminServiceAddr;
private Map<String, RegisterServerInfo> adminServerInfoMap = new
HashMap<>();
- private final RegistryService registryService;
+ private RegistryService registryService;
private Runtime runtime;
@@ -57,29 +57,34 @@ public class RuntimeInstance {
public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
- this.registryService =
RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
+ if (runtimeInstanceConfig.isRegistryEnabled()) {
+ this.registryService =
RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
+ }
}
public void init() throws Exception {
- registryService.init();
- QueryInstances queryInstances = new QueryInstances();
-
queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
- queryInstances.setHealth(true);
- List<RegisterServerInfo> adminServerRegisterInfoList =
registryService.selectInstances(queryInstances);
- if (!adminServerRegisterInfoList.isEmpty()) {
- adminServerAddr =
getRandomAdminServerAddr(adminServerRegisterInfoList);
- } else {
- throw new RuntimeException("admin server address is empty, please
check");
+ if (registryService != null) {
+ registryService.init();
+ QueryInstances queryInstances = new QueryInstances();
+
queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
+ queryInstances.setHealth(true);
+ List<RegisterServerInfo> adminServerRegisterInfoList =
registryService.selectInstances(queryInstances);
+ if (!adminServerRegisterInfoList.isEmpty()) {
+ adminServiceAddr =
getRandomAdminServerAddr(adminServerRegisterInfoList);
+ } else {
+ throw new RuntimeException("admin server address is empty,
please check");
+ }
+ // use registry adminServiceAddr value replace config
+ runtimeInstanceConfig.setAdminServiceAddr(adminServiceAddr);
}
- runtimeInstanceConfig.setAdminServerAddr(adminServerAddr);
+
runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
runtime = runtimeFactory.createRuntime(runtimeInstanceConfig);
runtime.init();
}
public void start() throws Exception {
- if (!StringUtils.isBlank(adminServerAddr)) {
-
+ if (!StringUtils.isBlank(adminServiceAddr) && registryService != null)
{
registryService.subscribe((event) -> {
log.info("runtime receive registry event: {}", event);
List<RegisterServerInfo> registerServerInfoList =
event.getInstances();
@@ -91,7 +96,6 @@ public class RuntimeInstance {
adminServerInfoMap = registerServerInfoMap;
updateAdminServerAddr();
}
-
}, runtimeInstanceConfig.getAdminServiceName());
runtime.start();
isStarted = true;
@@ -106,14 +110,14 @@ public class RuntimeInstance {
private void updateAdminServerAddr() throws Exception {
if (isStarted) {
- if (!adminServerInfoMap.containsKey(adminServerAddr)) {
- adminServerAddr = getRandomAdminServerAddr(adminServerInfoMap);
- log.info("admin server address changed to: {}",
adminServerAddr);
+ if (!adminServerInfoMap.containsKey(adminServiceAddr)) {
+ adminServiceAddr =
getRandomAdminServerAddr(adminServerInfoMap);
+ log.info("admin server address changed to: {}",
adminServiceAddr);
shutdown();
start();
}
} else {
- adminServerAddr = getRandomAdminServerAddr(adminServerInfoMap);
+ adminServiceAddr = getRandomAdminServerAddr(adminServerInfoMap);
}
}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 6cd0452b8..1e589ebd9 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -150,7 +150,7 @@ public class ConnectorRuntime implements Runtime {
private void initAdminService() {
// create gRPC channel
- channel =
ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServerAddr()).usePlaintext().build();
+ channel =
ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServiceAddr()).usePlaintext().build();
adminServiceStub =
AdminServiceGrpc.newStub(channel).withWaitForReady();
diff --git a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
index 44c5f6f91..c5ffac9d9 100644
--- a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
@@ -16,7 +16,9 @@
#
componentType: CONNECTOR
+registryEnabled: false
registryServerAddr: 127.0.0.1:8085
registryPluginType: nacos
storagePluginType: memory
adminServiceName: eventmesh-admin
+adminServiceAddr: "127.0.0.1:8085;127.0.0.1:8086"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]