This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 691aab015 [ISSUE #5071] Enhancement for admin server and canal
source/sink connector (#5072)
691aab015 is described below
commit 691aab0152022ec98ef74e6252775bbd336a2f50
Author: mike_xwm <[email protected]>
AuthorDate: Wed Aug 7 18:16:23 2024 +0800
[ISSUE #5071] Enhancement for admin server and canal source/sink connector
(#5072)
* [ISSUE #5069] Enhancement for http source/sink connector
* update http source connector & config
* fix checkstyle error
* [ISSUE #5071] Enhancement for admin server and canal source/sink connector
---
eventmesh-admin-server/build.gradle | 2 +-
eventmesh-admin-server/conf/application.yaml | 8 +++-
eventmesh-admin-server/conf/eventmesh.sql | 7 +--
.../conf/mapper/EventMeshDataSourceMapper.xml | 5 +-
.../server/web/db/entity/EventMeshDataSource.java | 2 +
.../web/handler/impl/FetchJobRequestHandler.java | 4 +-
.../service/datasource/DataSourceBizService.java | 2 +
.../server/web/service/job/JobInfoBizService.java | 16 +++++--
.../server/web/service/task/TaskBizService.java | 35 ++++++++++++--
.../common/remote/datasource/DataSourceType.java | 9 ++++
.../request/CreateOrUpdateDataSourceReq.java | 1 +
.../common/remote/request/CreateTaskRequest.java | 6 +--
...apache.eventmesh.common.remote.payload.IPayload | 1 +
.../connector/canal/CanalConnectRecord.java | 5 +-
.../canal/sink/connector/CanalSinkConnector.java | 24 +++++++---
.../connector/canal/source/EntryParser.java | 15 ++++--
.../offsetmgmt/admin/AdminOffsetService.java | 13 ++++-
.../offsetmgmt/api/data/ConnectRecord.java | 53 ++++----------------
.../eventmesh/runtime/boot/RuntimeInstance.java | 34 +++++++------
.../runtime/connector/ConnectorRuntime.java | 56 +++++++++++++++-------
.../src/main/resources/connector.yaml | 6 ++-
.../src/main/resources/runtime.yaml | 2 +-
22 files changed, 195 insertions(+), 111 deletions(-)
diff --git a/eventmesh-admin-server/build.gradle
b/eventmesh-admin-server/build.gradle
index bdb6406da..1fec2c7c5 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-registry:eventmesh-registry-api")
implementation project(":eventmesh-registry:eventmesh-registry-nacos")
- implementation
project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
+ implementation
project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
implementation "com.alibaba.nacos:nacos-client"
implementation("org.springframework.boot:spring-boot-starter-web") {
exclude group: "org.springframework.boot", module:
"spring-boot-starter-tomcat"
diff --git a/eventmesh-admin-server/conf/application.yaml
b/eventmesh-admin-server/conf/application.yaml
index afbcd4a43..274196db6 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -26,13 +26,17 @@ mybatis-plus:
configuration:
map-underscore-to-camel-case: false
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+# http server port
+server:
+ port: 8082
event-mesh:
admin-server:
serviceName: DEFAULT_GROUP@@em_adm_server
+ # grpc server port
port: 8081
adminServerList:
region1:
- - http://localhost:8081
- region2:
- http://localhost:8082
+ region2:
+ - http://localhost:8083
region: region1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql
b/eventmesh-admin-server/conf/eventmesh.sql
index 94edbb6fa..bdad02a8d 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -33,6 +33,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
DEFAULT NULL,
`configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL,
+ `configurationClass` varchar(200) CHARACTER SET utf8mb4 COLLATE
utf8mb4_general_ci NOT NULL DEFAULT '',
`region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -134,13 +135,13 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
- `id` int NOT NULL,
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `connectorName` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `position` text COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
index d100e1903..50e6ad82c 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
@@ -28,6 +28,7 @@
<result property="dataType" column="dataType" jdbcType="VARCHAR"/>
<result property="description" column="description"
jdbcType="VARCHAR"/>
<result property="configuration" column="configuration"
jdbcType="VARCHAR"/>
+ <result property="configurationClass" column="configurationClass"
jdbcType="VARCHAR"/>
<result property="region" column="region" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid"
jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid"
jdbcType="VARCHAR"/>
@@ -37,7 +38,7 @@
<sql id="Base_Column_List">
id,dataType,description,
- configuration,region,createUid,updateUid,
- createTime,updateTime
+ configuration,configurationClass,region,
+ createUid,updateUid,createTime,updateTime
</sql>
</mapper>
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
index 9d81366aa..e6e328984 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
@@ -41,6 +41,8 @@ public class EventMeshDataSource implements Serializable {
private String configuration;
+ private String configurationClass;
+
private String region;
private String createUid;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index 8f159fa45..b377bcddd 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -53,9 +53,9 @@ public class FetchJobRequestHandler extends
BaseRequestHandler<FetchJobRequest,
}
response.setId(detail.getJobID());
JobConnectorConfig config = new JobConnectorConfig();
-
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource()));
+
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
-
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource()));
+
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
index 433847a4c..4d2d67010 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
@@ -29,12 +29,14 @@ import org.springframework.stereotype.Service;
@Service
public class DataSourceBizService {
+
@Autowired
private EventMeshDataSourceService dataSourceService;
public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq
dataSource) {
EventMeshDataSource entity = new EventMeshDataSource();
entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig()));
+ entity.setConfigurationClass(dataSource.getConfigClass());
entity.setDataType(dataSource.getType().name());
entity.setCreateUid(dataSource.getOperator());
entity.setUpdateUid(dataSource.getOperator());
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index ea0265848..0657383e2 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -27,6 +27,7 @@ import
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import
org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
import
org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
+import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -114,6 +115,7 @@ public class JobInfoBizService {
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
source.setConfig(job.getSourceDataSource().getConf());
+
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
EventMeshDataSource createdSource =
dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());
@@ -123,6 +125,7 @@ public class JobInfoBizService {
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
sink.setConfig(job.getSinkDataSource().getConf());
+
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
EventMeshDataSource createdSink =
dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());
@@ -141,18 +144,22 @@ public class JobInfoBizService {
if (jobID == null) {
return null;
}
- EventMeshJobInfo job = jobInfoService.getById(jobID);
+ EventMeshJobInfo job =
jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
if (job == null) {
return null;
}
JobDetail detail = new JobDetail();
+ detail.setTaskID(job.getTaskID());
detail.setJobID(job.getJobID());
EventMeshDataSource source =
dataSourceService.getById(job.getSourceData());
EventMeshDataSource target =
dataSourceService.getById(job.getTargetData());
if (source != null) {
if (!StringUtils.isBlank(source.getConfiguration())) {
try {
-
detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(),
DataSource.class));
+ DataSource sourceDataSource = new DataSource();
+ Class<?> configClass =
Class.forName(source.getConfigurationClass());
+ sourceDataSource.setConf((Config)
JsonUtils.parseObject(source.getConfiguration(), configClass));
+ detail.setSourceDataSource(sourceDataSource);
} catch (Exception e) {
log.warn("parse source config id [{}] fail",
job.getSourceData(), e);
throw new
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source
config");
@@ -168,7 +175,10 @@ public class JobInfoBizService {
if (target != null) {
if (!StringUtils.isBlank(target.getConfiguration())) {
try {
-
detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(),
DataSource.class));
+ DataSource sinkDataSource = new DataSource();
+ Class<?> configClass =
Class.forName(target.getConfigurationClass());
+ sinkDataSource.setConf((Config)
JsonUtils.parseObject(target.getConfiguration(), configClass));
+ detail.setSinkDataSource(sinkDataSource);
} catch (Exception e) {
log.warn("parse sink config id [{}] fail",
job.getSourceData(), e);
throw new
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink
config");
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
index f68645613..7089f9cf7 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
@@ -22,12 +22,17 @@ import
org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo;
import
org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
+import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
+import org.apache.eventmesh.common.remote.datasource.DataSource;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -40,6 +45,7 @@ import org.springframework.web.client.RestTemplate;
@Service
public class TaskBizService {
+
@Autowired
private EventMeshTaskInfoService taskInfoService;
@@ -76,7 +82,12 @@ public class TaskBizService {
String finalTaskID = taskID;
List<JobDetail> jobs = req.getJobs().stream().map(x -> {
- JobDetail job = parse(x);
+ JobDetail job = null;
+ try {
+ job = parse(x);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
job.setTaskID(finalTaskID);
job.setCreateUid(req.getUid());
job.setUpdateUid(req.getUid());
@@ -95,14 +106,30 @@ public class TaskBizService {
return finalTaskID;
}
- private JobDetail parse(CreateTaskRequest.JobDetail src) {
+ private JobDetail parse(CreateTaskRequest.JobDetail src) throws
ClassNotFoundException {
JobDetail dst = new JobDetail();
dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
- dst.setSourceDataSource(src.getSourceDataSource());
+ Map<String, Object> sourceDataMap = src.getSourceDataSource();
+ DataSource sourceDataSource = new DataSource();
+
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
+ sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
+ sourceDataSource.setConfClazz((Class<? extends Config>)
Class.forName(sourceDataMap.get("confClazz").toString()));
+
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")),
sourceDataSource.getConfClazz()));
+ sourceDataSource.setRegion((String) sourceDataMap.get("region"));
+ dst.setSourceDataSource(sourceDataSource);
+
dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
- dst.setSinkDataSource(src.getSinkDataSource());
+ Map<String, Object> sinkDataMap = src.getSinkDataSource();
+ DataSource sinkDataSource = new DataSource();
+
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
+ sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
+ sinkDataSource.setConfClazz((Class<? extends Config>)
Class.forName(sinkDataMap.get("confClazz").toString()));
+
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")),
sinkDataSource.getConfClazz()));
+ sinkDataSource.setRegion((String) sinkDataMap.get("region"));
+ dst.setSinkDataSource(sinkDataSource);
+
// full/increase/check
dst.setJobType(src.getJobType());
dst.setFromRegion(src.getFromRegion());
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
index 985f311b9..8c40971e7 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
@@ -61,4 +61,13 @@ public enum DataSourceType {
}
return TYPES[index];
}
+
+ public static DataSourceType fromString(String type) {
+ for (DataSourceType dataSourceType : DataSourceType.values()) {
+ if (dataSourceType.name().equalsIgnoreCase(type)) {
+ return dataSourceType;
+ }
+ }
+ throw new IllegalArgumentException("No enum constant for type: " +
type);
+ }
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
index fadfa68e7..f78349703 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
@@ -34,6 +34,7 @@ public class CreateOrUpdateDataSourceReq extends
BaseRemoteRequest {
private DataSourceType type;
private String desc;
private Config config;
+ private String configClass;
private String region;
private String operator;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
index 47c45595a..c895b5c44 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
@@ -18,10 +18,10 @@
package org.apache.eventmesh.common.remote.request;
import org.apache.eventmesh.common.remote.TransportType;
-import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.job.JobType;
import java.util.List;
+import java.util.Map;
import lombok.Data;
@@ -61,11 +61,11 @@ public class CreateTaskRequest {
// full/increase/check
private JobType jobType;
- private DataSource sourceDataSource;
+ private Map<String, Object> sourceDataSource;
private String sourceConnectorDesc;
- private DataSource sinkDataSource;
+ private Map<String, Object> sinkDataSource;
private String sinkConnectorDesc;
diff --git
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 2af95c751..82d5c94dd 100644
---
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
org.apache.eventmesh.common.remote.request.FetchJobRequest
org.apache.eventmesh.common.remote.response.FetchJobResponse
org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportVerifyRequest
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
org.apache.eventmesh.common.remote.request.FetchPositionRequest
org.apache.eventmesh.common.remote.response.FetchPositionResponse
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
index 36ecd158f..6f112081e 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
@@ -22,13 +22,16 @@ import org.apache.eventmesh.common.remote.job.SyncMode;
import org.apache.eventmesh.connector.canal.model.EventColumn;
import org.apache.eventmesh.connector.canal.model.EventType;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
@Data
-public class CanalConnectRecord {
+public class CanalConnectRecord implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private String schemaName;
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 2ecb2384a..49fb10dd3 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -44,6 +44,7 @@ import
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.SerializationUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -163,7 +164,11 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
public void put(List<ConnectRecord> sinkRecords) {
DbLoadContext context = new DbLoadContext();
for (ConnectRecord connectRecord : sinkRecords) {
- List<CanalConnectRecord> canalConnectRecordList =
(List<CanalConnectRecord>) connectRecord.getData();
+ List<CanalConnectRecord> canalConnectRecordList = new
ArrayList<>();
+ // deep copy connectRecord data
+ for (CanalConnectRecord record : (List<CanalConnectRecord>)
connectRecord.getData()) {
+ canalConnectRecordList.add(SerializationUtils.clone(record));
+ }
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList, connectRecord);
@@ -175,7 +180,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
DbLoadData loadData = new DbLoadData();
doBefore(canalConnectRecordList, loadData);
- doLoad(context, sinkConfig, loadData);
+ doLoad(context, sinkConfig, loadData, connectRecord);
}
@@ -259,7 +264,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
}
- private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig,
DbLoadData loadData) {
+ private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig,
DbLoadData loadData, ConnectRecord connectRecord) {
List<List<CanalConnectRecord>> batchDatas = new ArrayList<>();
for (TableLoadData tableData : loadData.getTables()) {
if (useBatch) {
@@ -271,7 +276,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
}
- doTwoPhase(context, sinkConfig, batchDatas, true);
+ doTwoPhase(context, sinkConfig, batchDatas, true, connectRecord);
batchDatas.clear();
@@ -289,7 +294,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
}
- doTwoPhase(context, sinkConfig, batchDatas, true);
+ doTwoPhase(context, sinkConfig, batchDatas, true, connectRecord);
batchDatas.clear();
}
@@ -390,7 +395,8 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
&& StringUtils.equals(source.getSql(), target.getSql());
}
- private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig,
List<List<CanalConnectRecord>> totalRows, boolean canBatch) {
+ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig,
List<List<CanalConnectRecord>> totalRows, boolean canBatch,
+ ConnectRecord connectRecord) {
List<Future<Exception>> results = new ArrayList<>();
for (List<CanalConnectRecord> rows : totalRows) {
if (CollectionUtils.isEmpty(rows)) {
@@ -404,6 +410,9 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
Exception ex = null;
try {
ex = result.get();
+ if (ex == null) {
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+ }
} catch (Exception e) {
ex = e;
}
@@ -433,12 +442,14 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
log.warn("skip exception for data : {} , caused by
{}",
retryRecord,
ExceptionUtils.getFullStackTrace(ex));
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} catch (Exception ex) {
// do skip
log.warn("skip exception for data : {} , caused by {}",
retryRecord,
ExceptionUtils.getFullStackTrace(ex));
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
}
} else {
@@ -451,6 +462,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
} catch (Exception ex) {
log.error("##load phase two failed!", ex);
log.error("sink connector will shutdown by " +
ex.getMessage(), ex);
+
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
ex));
executor.shutdown();
System.exit(1);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 5c4303588..75572a5fa 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -67,7 +67,7 @@ public class EntryParser {
}
} else {
// if not gtid mode, need check weather the entry
is loopback by specified column value
- needSync = checkNeedSync(sourceConfig,
rowChange.getRowDatas(0));
+ needSync = checkNeedSync(sourceConfig, rowChange);
if (needSync) {
transactionDataBuffer.add(entry);
}
@@ -115,9 +115,16 @@ public class EntryParser {
}
}
- private static boolean checkNeedSync(CanalSourceConfig sourceConfig,
RowData rowData) {
- Column markedColumn =
getColumnIgnoreCase(rowData.getAfterColumnsList(),
- sourceConfig.getNeedSyncMarkTableColumnName());
+ private static boolean checkNeedSync(CanalSourceConfig sourceConfig,
RowChange rowChange) {
+ Column markedColumn = null;
+ CanalEntry.EventType eventType = rowChange.getEventType();
+ if (eventType.equals(CanalEntry.EventType.DELETE) ||
eventType.equals(CanalEntry.EventType.UPDATE)) {
+ markedColumn =
getColumnIgnoreCase(rowChange.getRowDatas(0).getBeforeColumnsList(),
+ sourceConfig.getNeedSyncMarkTableColumnName());
+ } else if (eventType.equals(CanalEntry.EventType.INSERT)) {
+ markedColumn =
getColumnIgnoreCase(rowChange.getRowDatas(0).getAfterColumnsList(),
+ sourceConfig.getNeedSyncMarkTableColumnName());
+ }
if (markedColumn != null) {
return StringUtils.equalsIgnoreCase(markedColumn.getValue(),
sourceConfig.getNeedSyncMarkTableColumnValue());
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 08270fc02..977661b13 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -236,7 +237,7 @@ public class AdminOffsetService implements
OffsetManagementService {
this.dataSourceType = offsetStorageConfig.getDataSourceType();
this.dataSinkType = offsetStorageConfig.getDataSinkType();
- this.adminServerAddr = offsetStorageConfig.getOffsetStorageAddr();
+ this.adminServerAddr =
getRandomAdminServerAddr(offsetStorageConfig.getOffsetStorageAddr());
this.channel = ManagedChannelBuilder.forTarget(adminServerAddr)
.usePlaintext()
.build();
@@ -274,4 +275,14 @@ public class AdminOffsetService implements
OffsetManagementService {
this.jobState = TaskState.RUNNING;
this.jobId = offsetStorageConfig.getExtensions().get("jobId");
}
+
+ private String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is
empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index b3fc4346c..0a41e18f7 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -26,21 +26,30 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* SourceDataEntries are generated by SourceTasks and passed to specific
message queue to store.
*/
+@Getter
public class ConnectRecord {
private final String recordId = UUID.randomUUID().toString();
+ @Setter
private Long timestamp;
+ @Setter
private Object data;
+ @Setter
private RecordPosition position;
+ @Setter
private KeyValue extensions;
+ @Setter
private SendMessageCallback callback;
public ConnectRecord() {
@@ -63,42 +72,6 @@ public class ConnectRecord {
this.data = data;
}
- public String getRecordId() {
- return recordId;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Object getData() {
- return data;
- }
-
- public void setData(Object data) {
- this.data = data;
- }
-
- public KeyValue getExtensions() {
- return extensions;
- }
-
- public void setExtensions(KeyValue extensions) {
- this.extensions = extensions;
- }
-
- public RecordPosition getPosition() {
- return position;
- }
-
- public void setPosition(RecordPosition position) {
- this.position = position;
- }
-
public void addExtension(KeyValue extensions) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
@@ -137,14 +110,6 @@ public class ConnectRecord {
return this.extensions.getObject(key);
}
- public SendMessageCallback getCallback() {
- return callback;
- }
-
- public void setCallback(SendMessageCallback callback) {
- this.callback = callback;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
index acea321e9..beb1d1eed 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
@@ -76,6 +76,8 @@ public class RuntimeInstance {
}
// use registry adminServiceAddr value replace config
runtimeInstanceConfig.setAdminServiceAddr(adminServiceAddr);
+ } else {
+ adminServiceAddr = runtimeInstanceConfig.getAdminServiceAddr();
}
runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
@@ -84,23 +86,25 @@ public class RuntimeInstance {
}
public void start() throws Exception {
- if (!StringUtils.isBlank(adminServiceAddr) && registryService != null)
{
- registryService.subscribe((event) -> {
- log.info("runtime receive registry event: {}", event);
- List<RegisterServerInfo> registerServerInfoList =
event.getInstances();
- Map<String, RegisterServerInfo> registerServerInfoMap = new
HashMap<>();
- for (RegisterServerInfo registerServerInfo :
registerServerInfoList) {
- registerServerInfoMap.put(registerServerInfo.getAddress(),
registerServerInfo);
- }
- if (!registerServerInfoMap.isEmpty()) {
- adminServerInfoMap = registerServerInfoMap;
- updateAdminServerAddr();
- }
- }, runtimeInstanceConfig.getAdminServiceName());
+ if (StringUtils.isBlank(adminServiceAddr)) {
+ throw new RuntimeException("admin server address is empty, please
check");
+ } else {
+ if (registryService != null) {
+ registryService.subscribe((event) -> {
+ log.info("runtime receive registry event: {}", event);
+ List<RegisterServerInfo> registerServerInfoList =
event.getInstances();
+ Map<String, RegisterServerInfo> registerServerInfoMap =
new HashMap<>();
+ for (RegisterServerInfo registerServerInfo :
registerServerInfoList) {
+
registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
+ }
+ if (!registerServerInfoMap.isEmpty()) {
+ adminServerInfoMap = registerServerInfoMap;
+ updateAdminServerAddr();
+ }
+ }, runtimeInstanceConfig.getAdminServiceName());
+ }
runtime.start();
isStarted = true;
- } else {
- throw new RuntimeException("admin server address is empty, please
check");
}
}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 1e589ebd9..501f222fd 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -63,9 +63,12 @@ import org.apache.commons.lang3.StringUtils;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -132,6 +135,8 @@ public class ConnectorRuntime implements Runtime {
public static final String CALLBACK_EXTENSION = "callBackExtension";
+ private String adminServerAddr;
+
public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
@@ -149,8 +154,9 @@ public class ConnectorRuntime implements Runtime {
}
private void initAdminService() {
+ adminServerAddr =
getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
// create gRPC channel
- channel =
ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServiceAddr()).usePlaintext().build();
+ channel =
ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build();
adminServiceStub =
AdminServiceGrpc.newStub(channel).withWaitForReady();
@@ -176,6 +182,16 @@ public class ConnectorRuntime implements Runtime {
requestObserver = adminServiceStub.invokeBiStream(responseObserver);
}
+ private String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is
empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
+
private void initStorageService() {
// TODO: init producer & consumer
producer =
StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType());
@@ -202,25 +218,18 @@ public class ConnectorRuntime implements Runtime {
connectorRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc());
connectorRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig());
- ConnectorCreateService<?> sourceConnectorCreateService =
-
ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType()
+ "-Source");
- sourceConnector = (Source) sourceConnectorCreateService.create();
-
- SourceConfig sourceConfig = (SourceConfig)
ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(),
sourceConnector.configClass());
- SourceConnectorContext sourceConnectorContext = new
SourceConnectorContext();
- sourceConnectorContext.setSourceConfig(sourceConfig);
-
sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
- sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
- if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
-
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
- }
-
// spi load offsetMgmtService
this.offsetManagement = new RecordOffsetManagement();
this.committableOffsets =
RecordOffsetManagement.CommittableOffsets.EMPTY;
- OffsetStorageConfig offsetStorageConfig =
sourceConfig.getOffsetStorageConfig();
+ OffsetStorageConfig offsetStorageConfig = new OffsetStorageConfig();
+
offsetStorageConfig.setOffsetStorageAddr(connectorRuntimeConfig.getRuntimeConfig().get("offsetStorageAddr").toString());
+
offsetStorageConfig.setOffsetStorageType(connectorRuntimeConfig.getRuntimeConfig().get("offsetStoragePluginType").toString());
offsetStorageConfig.setDataSourceType(jobResponse.getTransportType().getSrc());
offsetStorageConfig.setDataSinkType(jobResponse.getTransportType().getDst());
+ Map<String, String> offsetStorageExtensions = new HashMap<>();
+ offsetStorageExtensions.put("jobId",
connectorRuntimeConfig.getJobID());
+ offsetStorageConfig.setExtensions(offsetStorageExtensions);
+
this.offsetManagementService =
Optional.ofNullable(offsetStorageConfig).map(OffsetStorageConfig::getOffsetStorageType)
.map(storageType ->
EventMeshExtensionFactory.getExtension(OffsetManagementService.class,
storageType))
.orElse(new DefaultOffsetManagementServiceImpl());
@@ -228,6 +237,18 @@ public class ConnectorRuntime implements Runtime {
this.offsetStorageWriter = new
OffsetStorageWriterImpl(offsetManagementService);
this.offsetStorageReader = new
OffsetStorageReaderImpl(offsetManagementService);
+ ConnectorCreateService<?> sourceConnectorCreateService =
+
ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType()
+ "-Source");
+ sourceConnector = (Source) sourceConnectorCreateService.create();
+
+ SourceConfig sourceConfig = (SourceConfig)
ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(),
sourceConnector.configClass());
+ SourceConnectorContext sourceConnectorContext = new
SourceConnectorContext();
+ sourceConnectorContext.setSourceConfig(sourceConfig);
+
sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
+ sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+ if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+ }
sourceConnector.init(sourceConnectorContext);
ConnectorCreateService<?> sinkConnectorCreateService =
@@ -330,6 +351,9 @@ public class ConnectorRuntime implements Runtime {
// TODO: use producer pub record to storage replace below
if (connectorRecordList != null && !connectorRecordList.isEmpty())
{
for (ConnectRecord record : connectorRecordList) {
+
+ queue.put(record);
+
// if enabled incremental data reporting consistency check
if
(connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(record, connectorRuntimeConfig,
ConnectorStage.SOURCE);
@@ -363,8 +387,6 @@ public class ConnectorRuntime implements Runtime {
}
});
- queue.put(record);
-
offsetManagement.awaitAllMessages(5000,
TimeUnit.MILLISECONDS);
// update & commit offset
updateCommittableOffsets();
diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml
b/eventmesh-runtime-v2/src/main/resources/connector.yaml
index 2e79e5ced..3e407fa3e 100644
--- a/eventmesh-runtime-v2/src/main/resources/connector.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml
@@ -15,7 +15,9 @@
# limitations under the License.
#
-taskID: 1
-jobID: 1
+taskID: 9c18a0d2-7a61-482c-8275-34f8c2786cea
+jobID: a01fd5e1-d295-4b89-99bc-0ae23eb85acf
region: region1
runtimeConfig: # this used for connector runtime config
+ offsetStoragePluginType: admin
+ offsetStorageAddr: "127.0.0.1:8081;127.0.0.1:8081"
\ No newline at end of file
diff --git a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
index c5ffac9d9..9ac36f27b 100644
--- a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
@@ -21,4 +21,4 @@ registryServerAddr: 127.0.0.1:8085
registryPluginType: nacos
storagePluginType: memory
adminServiceName: eventmesh-admin
-adminServiceAddr: "127.0.0.1:8085;127.0.0.1:8086"
+adminServiceAddr: "127.0.0.1:8081;127.0.0.1:8081"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]