This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba54c775 [ISSUE #5052] Enhancement for source\sink connector (#5066)
2ba54c775 is described below
commit 2ba54c7751a83348a41fc4eaa25898faaee11951
Author: mike_xwm <[email protected]>
AuthorDate: Thu Aug 1 14:35:25 2024 +0800
[ISSUE #5052] Enhancement for source\sink connector (#5066)
* [ISSUE #5040] Support gtid mode for sync data with mysql
* fix conflicts with master
* fix checkstyle error
* [ISSUE #5044] Data synchronization strong verification in mariadb gtid
mode
* fix checkstyle error
* [ISSUE #5048] Add report verify request to admin for connector runtime
* fix checkstyle error
* [ISSUE #5052] Enhancement for source\sink connector
* fix checkstyle error
* fix checkstyle error
---
eventmesh-admin-server/conf/eventmesh.sql | 52 +++++++-------------
.../server/web/service/job/JobInfoBizService.java | 1 -
.../canal/sink/connector/CanalSinkConnector.java | 41 ++++++++++++++--
.../sink/connector/CanalSinkFullConnector.java | 5 ++
.../source/connector/CanalSourceConnector.java | 5 ++
.../source/connector/CanalSourceFullConnector.java | 5 ++
.../source/connector/ChatGPTSourceConnector.java | 5 ++
.../sink/connector/DingDingSinkConnector.java | 5 ++
.../file/sink/connector/FileSinkConnector.java | 5 ++
.../file/source/connector/FileSourceConnector.java | 5 ++
.../connector/http/sink/HttpSinkConnector.java | 5 ++
.../connector/http/source/HttpSourceConnector.java | 5 ++
.../connector/jdbc/sink/JdbcSinkConnector.java | 5 ++
.../connector/jdbc/source/JdbcSourceConnector.java | 5 ++
.../kafka/sink/connector/KafkaSinkConnector.java | 5 ++
.../source/connector/KafkaSourceConnector.java | 5 ++
.../sink/connector/KnativeSinkConnector.java | 5 ++
.../source/connector/KnativeSourceConnector.java | 5 ++
.../lark/sink/connector/LarkSinkConnector.java | 5 ++
.../sink/connector/MongodbSinkConnector.java | 5 ++
.../source/connector/MongodbSourceConnector.java | 5 ++
.../sink/connector/OpenFunctionSinkConnector.java | 5 ++
.../connector/OpenFunctionSourceConnector.java | 5 ++
.../sink/connector/PravegaSinkConnector.java | 5 ++
.../source/connector/PravegaSourceConnector.java | 5 ++
.../connector/PrometheusSourceConnector.java | 5 ++
.../pulsar/sink/connector/PulsarSinkConnector.java | 5 ++
.../source/connector/PulsarSourceConnector.java | 5 ++
.../sink/connector/RabbitMQSinkConnector.java | 5 ++
.../source/connector/RabbitMQSourceConnector.java | 5 ++
.../redis/sink/connector/RedisSinkConnector.java | 5 ++
.../source/connector/RedisSourceConnector.java | 5 ++
.../sink/connector/RocketMQSinkConnector.java | 5 ++
.../source/connector/RocketMQSourceConnector.java | 5 ++
.../s3/source/connector/S3SourceConnector.java | 5 ++
.../slack/sink/connector/SlackSinkConnector.java | 5 ++
.../spring/sink/connector/SpringSinkConnector.java | 5 ++
.../spring/source/MessageSendingOperations.java | 2 +-
.../source/connector/SpringSourceConnector.java | 14 ++++--
.../wechat/sink/connector/WeChatSinkConnector.java | 5 ++
.../wecom/sink/connector/WeComSinkConnector.java | 5 ++
.../eventmesh/spring/pub/SpringPubController.java | 10 ++--
.../apache/eventmesh/openconnect/SourceWorker.java | 10 ++--
.../openconnect/api/connector/Connector.java | 11 ++++-
.../api/connector/SourceConnectorContext.java | 3 ++
.../api/callback/SendExceptionContext.java} | 6 +--
.../api/callback/SendMessageCallback.java | 4 +-
.../offsetmgmt}/api/callback/SendResult.java | 2 +-
.../offsetmgmt/api/data/ConnectRecord.java | 25 ++++++++--
.../runtime/connector/ConnectorRuntime.java | 57 +++++++++++++++++-----
.../runtime/connector/ConnectorRuntimeConfig.java | 2 +
.../src/main/resources/connector.yaml | 1 +
52 files changed, 344 insertions(+), 77 deletions(-)
diff --git a/eventmesh-admin-server/conf/eventmesh.sql
b/eventmesh-admin-server/conf/eventmesh.sql
index 586ab1c26..82d5c5331 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -23,11 +23,11 @@
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--- 导出 eventmesh 的数据库结构
+-- export eventmesh database
CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `eventmesh`;
--- 导出 表 eventmesh.event_mesh_data_source 结构
+-- export table eventmesh.event_mesh_data_source structure
CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -39,11 +39,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
--- 导出 表 eventmesh.event_mesh_job_info 结构
+-- export table eventmesh.event_mesh_job_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL,
@@ -61,11 +59,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
--- 导出 表 eventmesh.event_mesh_mysql_position 结构
+-- export table eventmesh.event_mesh_mysql_position structure
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -80,11 +76,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
ROW_FORMAT=DYNAMIC;
--- 导出 表 eventmesh.event_mesh_position_reporter_history 结构
+-- export table eventmesh.event_mesh_position_reporter_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -94,27 +88,23 @@ CREATE TABLE IF NOT EXISTS
`event_mesh_position_reporter_history` (
PRIMARY KEY (`id`),
KEY `job` (`job`),
KEY `address` (`address`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
COMMENT='record position reporter changes';
--- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构
+-- export table eventmesh.event_mesh_runtime_heartbeat structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL,
`runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
NOT NULL,
`jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
NOT NULL COMMENT 'runtime本地上报时间',
+ `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
NOT NULL COMMENT 'runtime local report time',
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
--- 导出 表 eventmesh.event_mesh_runtime_history 结构
+-- export table eventmesh.event_mesh_runtime_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -122,17 +112,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `address` (`address`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history';
--- 导出 表 eventmesh.event_mesh_task_info 结构
+-- export table eventmesh.event_mesh_task_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT
'TaskState',
+ `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT
'taskstate',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -140,11 +128,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `taskID` (`taskID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
--- 导出 表 eventmesh.event_mesh_verify 结构
+-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
@@ -157,8 +143,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
--- 数据导出被取消选择。
-
/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 357cf5d99..9affa10e6 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -50,7 +50,6 @@ import lombok.extern.slf4j.Slf4j;
/**
* for table 'event_mesh_job_info' db operation
- * 2024-05-09 15:51:45
*/
@Service
@Slf4j
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 8ecda8e12..2ecb2384a 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -38,6 +38,8 @@ import
org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.commons.lang.StringUtils;
@@ -146,6 +148,11 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
executor.shutdown();
@@ -159,7 +166,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
List<CanalConnectRecord> canalConnectRecordList =
(List<CanalConnectRecord>) connectRecord.getData();
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
- doDdl(context, canalConnectRecordList);
+ doDdl(context, canalConnectRecordList, connectRecord);
} else if (sinkConfig.isGTIDMode()) {
doLoadWithGtid(context, sinkConfig, connectRecord);
} else {
@@ -197,7 +204,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
.collect(Collectors.toList());
}
- private void doDdl(DbLoadContext context, List<CanalConnectRecord>
canalConnectRecordList) {
+ private void doDdl(DbLoadContext context, List<CanalConnectRecord>
canalConnectRecordList, ConnectRecord connectRecord) {
for (final CanalConnectRecord record : canalConnectRecordList) {
try {
Boolean result = jdbcTemplate.execute(new
StatementCallback<Boolean>() {
@@ -217,9 +224,30 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
context.getFailedRecords().add(record);
}
} catch (Throwable e) {
+
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
e));
throw new RuntimeException(e);
}
}
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+ }
+
+ private SendExceptionContext buildSendExceptionContext(ConnectRecord
record, Throwable e) {
+ SendExceptionContext sendExceptionContext = new SendExceptionContext();
+ sendExceptionContext.setMessageId(record.getRecordId());
+ sendExceptionContext.setCause(e);
+ if
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic")))
{
+ sendExceptionContext.setTopic(record.getExtension("topic"));
+ }
+ return sendExceptionContext;
+ }
+
+ private SendResult convertToSendResult(ConnectRecord record) {
+ SendResult result = new SendResult();
+ result.setMessageId(record.getRecordId());
+ if
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic")))
{
+ result.setTopic(record.getExtension("topic"));
+ }
+ return result;
}
private void doBefore(List<CanalConnectRecord> canalConnectRecordList,
final DbLoadData loadData) {
@@ -291,6 +319,9 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
Exception ex = null;
try {
ex = result.get();
+ if (ex == null) {
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+ }
} catch (Exception e) {
ex = e;
}
@@ -298,14 +329,16 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
if (skipException != null && skipException) {
if (ex != null) {
// do skip
- log.warn("skip exception for data : {} , caused by {}",
+ log.warn("skip exception will ack data : {} , caused by
{}",
filteredRows,
ExceptionUtils.getFullStackTrace(ex));
GtidBatchManager.removeGtidBatch(gtid);
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} else {
if (ex != null) {
log.error("sink connector will shutdown by " +
ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
+
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
ex));
gtidSingleExecutor.shutdown();
System.exit(1);
} else {
@@ -314,6 +347,8 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
} else {
log.info("Batch received, waiting for other batches.");
+ // ack this record
+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
index 36c03b156..2b4c9d7a9 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
@@ -109,6 +109,11 @@ public class CanalSinkFullConnector implements Sink,
ConnectorCreateService<Sink
return null;
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void put(List<ConnectRecord> sinkRecords) {
if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0)
== null) {
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index f3f8b2e16..ea5ccdeed 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -267,6 +267,11 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (!running) {
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
index df3c7571c..97730463b 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -159,6 +159,11 @@ public class CanalSourceFullConnector extends
AbstractComponent implements Sourc
return this.config.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public List<ConnectRecord> poll() {
while (flag.get()) {
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
index 4d54cb219..6b122087e 100644
---
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
@@ -224,6 +224,11 @@ public class ChatGPTSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
Throwable t = this.server.close().cause();
diff --git
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
index 417d9cef3..8c5a1e661 100644
---
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
@@ -103,6 +103,11 @@ public class DingDingSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
isRunning = false;
diff --git
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
index 89222b35b..fabae0d43 100644
---
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
@@ -103,6 +103,11 @@ public class FileSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
outputStream.flush();
diff --git
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
index 6ea0a0d33..68b1a5098 100644
---
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
@@ -86,6 +86,11 @@ public class FileSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
try {
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 6d38b4530..8a1475637 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -107,6 +107,11 @@ public class HttpSinkConnector implements Sink,
ConnectorCreateService<Sink> {
return this.httpSinkConfig.connectorConfig.getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.sinkHandler.stop();
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 4155aff91..1ca325b18 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -144,6 +144,11 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (this.server != null) {
diff --git
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
index 39681bf17..cc00f1e14 100644
---
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
@@ -139,6 +139,11 @@ public class JdbcSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
/**
* Stops the Connector.
*
diff --git
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
index 2b2efcbef..810a59e72 100644
---
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
@@ -192,6 +192,11 @@ public class JdbcSourceConnector extends SourceConnector {
return "JDBC Source Connector";
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
/**
* Stops the Connector.
*
diff --git
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
index b257cd0f4..0adafc1ce 100644
---
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
@@ -94,6 +94,11 @@ public class KafkaSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
producer.close();
diff --git
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
index a3be1cbf9..d57312693 100644
---
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
@@ -94,6 +94,11 @@ public class KafkaSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
kafkaConsumer.unsubscribe();
diff --git
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
index a12a1c746..b14f77ecd 100644
---
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
@@ -82,6 +82,11 @@ public class KnativeSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
started.compareAndSet(true, false);
diff --git
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
index 537c1ad4d..1b0c033e8 100644
---
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
@@ -65,6 +65,11 @@ public class KnativeSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
started.compareAndSet(true, false);
diff --git
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
index d340dffd1..9981322e8 100644
---
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
@@ -110,6 +110,11 @@ public class LarkSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (!started.compareAndSet(true, false)) {
diff --git
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
index 776ea8d71..1001ffa58 100644
---
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
@@ -87,6 +87,11 @@ public class MongodbSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.client.stop();
diff --git
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
index e57c39671..df3f66d6a 100644
---
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
@@ -93,6 +93,11 @@ public class MongodbSourceConnector implements Source {
return this.sourceConfig.connectorConfig.getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.client.stop();
diff --git
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
index 63444efe2..0f00a7e38 100644
---
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
@@ -74,6 +74,11 @@ public class OpenFunctionSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
}
diff --git
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
index b66bf9b18..534ecfb79 100644
---
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
@@ -76,6 +76,11 @@ public class OpenFunctionSourceConnector implements Source {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
diff --git
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
index e5f09e435..e089ef676 100644
---
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
@@ -109,6 +109,11 @@ public class PravegaSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
writerMap.forEach((topic, writer) -> writer.close());
diff --git
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
index 2611617d8..836779dbc 100644
---
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
@@ -148,6 +148,11 @@ public class PravegaSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
sourceHandlerMap.forEach((topic, handler) -> {
diff --git
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
index 5c78c718e..0cafed73f 100644
---
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
@@ -145,6 +145,11 @@ public class PrometheusSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
log.info("prometheus source connector stop.");
diff --git
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
index 9ff1f22a2..3f90c6c1b 100644
---
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
@@ -85,6 +85,11 @@ public class PulsarSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
try {
diff --git
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
index 212d3eb48..0bc576221 100644
---
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
@@ -87,6 +87,11 @@ public class PulsarSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
try {
diff --git
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
index 4a94a2cb1..08d1cefba 100644
---
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
@@ -95,6 +95,11 @@ public class RabbitMQSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (started) {
diff --git
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
index 655c20d9b..0b7e726bd 100644
---
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
@@ -117,6 +117,11 @@ public class RabbitMQSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
if (started) {
diff --git
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
index 83c3498a9..5b7d27c3b 100644
---
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
@@ -85,6 +85,11 @@ public class RedisSinkConnector implements Sink {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.redissonClient.shutdown();
diff --git
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
index 70adce59e..868639c20 100644
---
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
@@ -94,6 +94,11 @@ public class RedisSourceConnector implements Source {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
this.topic.removeAllListeners();
diff --git
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
index ae9d4824e..31d45a28f 100644
---
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
@@ -78,6 +78,11 @@ public class RocketMQSinkConnector implements Sink,
ConnectorCreateService<Sink>
return this.sinkConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
producer.shutdown();
diff --git
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
index 8ccb84acc..410f927d7 100644
---
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
@@ -206,6 +206,11 @@ public class RocketMQSourceConnector implements Source,
ConnectorCreateService<S
return this.sourceConfig.getConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
consumer.unsubscribe(sourceConfig.getConnectorConfig().getTopic());
diff --git
a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
index d0dc30c15..078ed7691 100644
---
a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
@@ -121,6 +121,11 @@ public class S3SourceConnector implements Source {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
index e48760d50..836409af7 100644
---
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
@@ -84,6 +84,11 @@ public class SlackSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() {
isRunning = false;
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
index 94c40eea5..9ba99cd54 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
@@ -77,6 +77,11 @@ public class SpringSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
index a337c1cd8..5f38914bb 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
@@ -17,7 +17,7 @@
package org.apache.eventmesh.connector.spring.source;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
/**
* Operations for sending messages.
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index 2ab5a3a3c..db286eb60 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -25,10 +25,10 @@ import
org.apache.eventmesh.common.remote.offset.spring.SpringRecordOffset;
import org.apache.eventmesh.common.remote.offset.spring.SpringRecordPartition;
import org.apache.eventmesh.connector.spring.source.MessageSendingOperations;
import org.apache.eventmesh.openconnect.SourceWorker;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.util.ArrayList;
@@ -95,6 +95,11 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws Exception {
@@ -123,6 +128,7 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
/**
* Send message.
+ *
* @param message message to send
*/
@Override
@@ -136,9 +142,9 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
/**
* Send message with a callback.
- * @param message message to send.
- * @param workerCallback After the user sends the message to the Connector,
- * the SourceWorker will fetch message and invoke.
+ *
+ * @param message message to send.
+ * @param workerCallback After the user sends the message to the
Connector, the SourceWorker will fetch message and invoke.
*/
@Override
public void send(Object message, SendMessageCallback workerCallback) {
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
index dec3f5e5d..6908d119b 100644
---
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
@@ -115,6 +115,11 @@ public class WeChatSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws IOException {
isRunning = false;
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
index ef6aed58c..ca628fa59 100644
---
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
@@ -95,6 +95,11 @@ public class WeComSinkConnector implements Sink {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
@Override
public void stop() throws IOException {
isRunning = false;
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
index b7ea8890e..a734bb6ef 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
@@ -19,9 +19,9 @@ package org.apache.eventmesh.spring.pub;
import org.apache.eventmesh.common.utils.JsonUtils;
import
org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector;
-import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
-import org.apache.eventmesh.openconnect.api.callback.SendResult;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import java.util.HashMap;
import java.util.Map;
@@ -53,8 +53,8 @@ public class SpringPubController {
}
@Override
- public void onException(SendExcepionContext sendExcepionContext) {
- log.info("Spring source worker send message to EventMesh
failed!", sendExcepionContext.getCause());
+ public void onException(SendExceptionContext sendExceptionContext)
{
+ log.info("Spring source worker send message to EventMesh
failed!", sendExceptionContext.getCause());
}
});
return "success!";
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 6e48aa1de..2a2162a7a 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -32,11 +32,11 @@ import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
-import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
-import org.apache.eventmesh.openconnect.api.callback.SendResult;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
@@ -264,8 +264,8 @@ public class SourceWorker implements ConnectorWorker {
return result;
}
- private SendExcepionContext convertToExceptionContext(CloudEvent event,
Throwable cause) {
- SendExcepionContext exceptionContext = new SendExcepionContext();
+ private SendExceptionContext convertToExceptionContext(CloudEvent event,
Throwable cause) {
+ SendExceptionContext exceptionContext = new SendExceptionContext();
exceptionContext.setTopic(event.getId());
exceptionContext.setMessageId(event.getId());
exceptionContext.setCause(cause);
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
index 8ac09eac3..07e44aea9 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
@@ -34,8 +34,7 @@ public interface Connector extends ComponentLifeCycle {
Class<? extends Config> configClass();
/**
- * This init method is obsolete. For detailed discussion,
- * please see <a
href="https://github.com/apache/eventmesh/issues/4565">here</a>
+ * This init method is obsolete. For detailed discussion, please see <a
href="https://github.com/apache/eventmesh/issues/4565">here</a>
* <p>
* Initializes the Connector with the provided configuration.
*
@@ -67,4 +66,12 @@ public interface Connector extends ComponentLifeCycle {
*/
String name();
+ /**
+ * This method will be called when an exception occurs while processing a
ConnectRecord object. This method can be used to handle the exception,
+ * such as logging error information, or stopping the connector's
operation when an exception occurs.
+ *
+ * @param record The ConnectRecord object that was being processed when
the exception occurred
+ */
+ void onException(ConnectRecord record);
+
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
index 55c88ce55..f70e77248 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
@@ -22,6 +22,7 @@ import
org.apache.eventmesh.common.remote.offset.RecordPosition;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;
import java.util.List;
+import java.util.Map;
import lombok.Data;
@@ -35,6 +36,8 @@ public class SourceConnectorContext implements
ConnectorContext {
public SourceConfig sourceConfig;
+ public Map<String, Object> runtimeConfig;
+
// initial record position
public List<RecordPosition> recordPositionList;
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
similarity index 90%
rename from
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
rename to
eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
index 0311ceaef..974b19a54 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
-public class SendExcepionContext {
+public class SendExceptionContext {
private String messageId;
private String topic;
private Throwable cause;
- public SendExcepionContext() {
+ public SendExceptionContext() {
}
public String getMessageId() {
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
similarity index 87%
rename from
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
rename to
eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
index fd6baba7e..8346cf36b 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
/**
* Message sending callback interface.
@@ -24,5 +24,5 @@ public interface SendMessageCallback {
void onSuccess(SendResult sendResult);
- void onException(SendExcepionContext sendExcepionContext);
+ void onException(SendExceptionContext sendExceptionContext);
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
similarity index 95%
rename from
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
rename to
eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
index 8cd861f6d..9afc745f3 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
public class SendResult {
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index cda57e375..b3fc4346c 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -20,15 +20,19 @@ package
org.apache.eventmesh.openconnect.offsetmgmt.api.data;
import org.apache.eventmesh.common.remote.offset.RecordOffset;
import org.apache.eventmesh.common.remote.offset.RecordPartition;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
/**
* SourceDataEntries are generated by SourceTasks and passed to specific
message queue to store.
*/
public class ConnectRecord {
+ private final String recordId = UUID.randomUUID().toString();
+
private Long timestamp;
private Object data;
@@ -37,6 +41,8 @@ public class ConnectRecord {
private KeyValue extensions;
+ private SendMessageCallback callback;
+
public ConnectRecord() {
}
@@ -57,6 +63,10 @@ public class ConnectRecord {
this.data = data;
}
+ public String getRecordId() {
+ return recordId;
+ }
+
public Long getTimestamp() {
return timestamp;
}
@@ -127,6 +137,14 @@ public class ConnectRecord {
return this.extensions.getObject(key);
}
+ public SendMessageCallback getCallback() {
+ return callback;
+ }
+
+ public void setCallback(SendMessageCallback callback) {
+ this.callback = callback;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -136,19 +154,20 @@ public class ConnectRecord {
return false;
}
ConnectRecord that = (ConnectRecord) o;
- return Objects.equals(timestamp, that.timestamp) &&
Objects.equals(data, that.data)
+ return Objects.equals(recordId, that.recordId) &&
Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data)
&& Objects.equals(position, that.position) &&
Objects.equals(extensions, that.extensions);
}
@Override
public int hashCode() {
- return Objects.hash(timestamp, data, position, extensions);
+ return Objects.hash(recordId, timestamp, data, position, extensions);
}
@Override
public String toString() {
return "ConnectRecord{"
- + "timestamp=" + timestamp
+ + "recordId=" + recordId
+ + ", timestamp=" + timestamp
+ ", data=" + data
+ ", position=" + position
+ ", extensions=" + extensions
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 0335a0956..6cd0452b8 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -38,12 +38,14 @@ import
org.apache.eventmesh.common.remote.response.FetchJobResponse;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.api.source.Source;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
@@ -56,6 +58,7 @@ import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -63,7 +66,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -207,6 +209,7 @@ public class ConnectorRuntime implements Runtime {
SourceConfig sourceConfig = (SourceConfig)
ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(),
sourceConnector.configClass());
SourceConnectorContext sourceConnectorContext = new
SourceConnectorContext();
sourceConnectorContext.setSourceConfig(sourceConfig);
+
sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
@@ -332,15 +335,36 @@ public class ConnectorRuntime implements Runtime {
reportVerifyRequest(record, connectorRuntimeConfig,
ConnectorStage.SOURCE);
}
+ // set a callback for this record
+ // if used the memory storage callback will be triggered
after sink put success
+ record.setCallback(new SendMessageCallback() {
+ @Override
+ public void onSuccess(SendResult result) {
+ // commit record
+ sourceConnector.commit(record);
+ Optional<RecordOffsetManagement.SubmittedPosition>
submittedRecordPosition = prepareToUpdateRecordOffset(record);
+
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+ Optional<SendMessageCallback> callback =
+
Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v ->
(SendMessageCallback) v);
+ callback.ifPresent(cb ->
cb.onSuccess(convertToSendResult(record)));
+ }
+
+ @Override
+ public void onException(SendExceptionContext
sendExceptionContext) {
+ // handle exception
+ sourceConnector.onException(record);
+ log.error("send record to sink callback exception,
process will shut down, record: {}", record,
+ sendExceptionContext.getCause());
+ try {
+ stop();
+ } catch (Exception e) {
+ log.error("Failed to stop after exception", e);
+ }
+ }
+ });
+
queue.put(record);
- Optional<RecordOffsetManagement.SubmittedPosition>
submittedRecordPosition = prepareToUpdateRecordOffset(record);
- Optional<SendMessageCallback> callback =
-
Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v ->
(SendMessageCallback) v);
- // commit record
- this.sourceConnector.commit(record);
-
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
- // TODO:finish the optional callback
- // callback.ifPresent(cb -> cb.onSuccess(record));
+
offsetManagement.awaitAllMessages(5000,
TimeUnit.MILLISECONDS);
// update & commit offset
updateCommittableOffsets();
@@ -350,13 +374,20 @@ public class ConnectorRuntime implements Runtime {
}
}
+ private SendResult convertToSendResult(ConnectRecord record) {
+ SendResult result = new SendResult();
+ result.setMessageId(record.getRecordId());
+ if (StringUtils.isNotEmpty(record.getExtension("topic"))) {
+ result.setTopic(record.getExtension("topic"));
+ }
+ return result;
+ }
+
private void reportVerifyRequest(ConnectRecord record,
ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
- UUID uuid = UUID.randomUUID();
- String recordId = uuid.toString();
String md5Str = md5(record.toString());
ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
- reportVerifyRequest.setRecordID(recordId);
+ reportVerifyRequest.setRecordID(record.getRecordId());
reportVerifyRequest.setRecordSig(md5Str);
reportVerifyRequest.setConnectorName(
IPUtils.getLocalAddress() + "_" +
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
index 5a58cce08..ab6fc3aaf 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
@@ -37,6 +37,8 @@ public class ConnectorRuntimeConfig {
private String region;
+ private Map<String, Object> runtimeConfig;
+
private String sourceConnectorType;
private String sourceConnectorDesc;
diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml
b/eventmesh-runtime-v2/src/main/resources/connector.yaml
index bf7f58028..2e79e5ced 100644
--- a/eventmesh-runtime-v2/src/main/resources/connector.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml
@@ -18,3 +18,4 @@
taskID: 1
jobID: 1
region: region1
+runtimeConfig: # this used for connector runtime config
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]