This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba54c775 [ISSUE #5052] Enhancement for source\sink connector (#5066)
2ba54c775 is described below

commit 2ba54c7751a83348a41fc4eaa25898faaee11951
Author: mike_xwm <[email protected]>
AuthorDate: Thu Aug 1 14:35:25 2024 +0800

    [ISSUE #5052] Enhancement for source\sink connector (#5066)
    
    * [ISSUE #5040] Support gtid mode for sync data with mysql
    
    * fix conflicts with master
    
    * fix checkstyle error
    
    * [ISSUE #5044] Data synchronization strong verification in mariadb gtid 
mode
    
    * fix checkstyle error
    
    * [ISSUE #5048] Add report verify request to admin for connector runtime
    
    * fix checkstyle error
    
    * [ISSUE #5052] Enhancement for source\sink connector
    
    * fix checkstyle error
    
    * fix checkstyle error
---
 eventmesh-admin-server/conf/eventmesh.sql          | 52 +++++++-------------
 .../server/web/service/job/JobInfoBizService.java  |  1 -
 .../canal/sink/connector/CanalSinkConnector.java   | 41 ++++++++++++++--
 .../sink/connector/CanalSinkFullConnector.java     |  5 ++
 .../source/connector/CanalSourceConnector.java     |  5 ++
 .../source/connector/CanalSourceFullConnector.java |  5 ++
 .../source/connector/ChatGPTSourceConnector.java   |  5 ++
 .../sink/connector/DingDingSinkConnector.java      |  5 ++
 .../file/sink/connector/FileSinkConnector.java     |  5 ++
 .../file/source/connector/FileSourceConnector.java |  5 ++
 .../connector/http/sink/HttpSinkConnector.java     |  5 ++
 .../connector/http/source/HttpSourceConnector.java |  5 ++
 .../connector/jdbc/sink/JdbcSinkConnector.java     |  5 ++
 .../connector/jdbc/source/JdbcSourceConnector.java |  5 ++
 .../kafka/sink/connector/KafkaSinkConnector.java   |  5 ++
 .../source/connector/KafkaSourceConnector.java     |  5 ++
 .../sink/connector/KnativeSinkConnector.java       |  5 ++
 .../source/connector/KnativeSourceConnector.java   |  5 ++
 .../lark/sink/connector/LarkSinkConnector.java     |  5 ++
 .../sink/connector/MongodbSinkConnector.java       |  5 ++
 .../source/connector/MongodbSourceConnector.java   |  5 ++
 .../sink/connector/OpenFunctionSinkConnector.java  |  5 ++
 .../connector/OpenFunctionSourceConnector.java     |  5 ++
 .../sink/connector/PravegaSinkConnector.java       |  5 ++
 .../source/connector/PravegaSourceConnector.java   |  5 ++
 .../connector/PrometheusSourceConnector.java       |  5 ++
 .../pulsar/sink/connector/PulsarSinkConnector.java |  5 ++
 .../source/connector/PulsarSourceConnector.java    |  5 ++
 .../sink/connector/RabbitMQSinkConnector.java      |  5 ++
 .../source/connector/RabbitMQSourceConnector.java  |  5 ++
 .../redis/sink/connector/RedisSinkConnector.java   |  5 ++
 .../source/connector/RedisSourceConnector.java     |  5 ++
 .../sink/connector/RocketMQSinkConnector.java      |  5 ++
 .../source/connector/RocketMQSourceConnector.java  |  5 ++
 .../s3/source/connector/S3SourceConnector.java     |  5 ++
 .../slack/sink/connector/SlackSinkConnector.java   |  5 ++
 .../spring/sink/connector/SpringSinkConnector.java |  5 ++
 .../spring/source/MessageSendingOperations.java    |  2 +-
 .../source/connector/SpringSourceConnector.java    | 14 ++++--
 .../wechat/sink/connector/WeChatSinkConnector.java |  5 ++
 .../wecom/sink/connector/WeComSinkConnector.java   |  5 ++
 .../eventmesh/spring/pub/SpringPubController.java  | 10 ++--
 .../apache/eventmesh/openconnect/SourceWorker.java | 10 ++--
 .../openconnect/api/connector/Connector.java       | 11 ++++-
 .../api/connector/SourceConnectorContext.java      |  3 ++
 .../api/callback/SendExceptionContext.java}        |  6 +--
 .../api/callback/SendMessageCallback.java          |  4 +-
 .../offsetmgmt}/api/callback/SendResult.java       |  2 +-
 .../offsetmgmt/api/data/ConnectRecord.java         | 25 ++++++++--
 .../runtime/connector/ConnectorRuntime.java        | 57 +++++++++++++++++-----
 .../runtime/connector/ConnectorRuntimeConfig.java  |  2 +
 .../src/main/resources/connector.yaml              |  1 +
 52 files changed, 344 insertions(+), 77 deletions(-)

diff --git a/eventmesh-admin-server/conf/eventmesh.sql 
b/eventmesh-admin-server/conf/eventmesh.sql
index 586ab1c26..82d5c5331 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -23,11 +23,11 @@
 /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
 
 
--- 导出 eventmesh 的数据库结构
+-- export eventmesh database
 CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET 
utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
 USE `eventmesh`;
 
--- 导出  表 eventmesh.event_mesh_data_source 结构
+-- export table eventmesh.event_mesh_data_source structure
 CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -39,11 +39,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
   `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
 
--- 导出  表 eventmesh.event_mesh_job_info 结构
+-- export table eventmesh.event_mesh_job_info structure
 CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
@@ -61,11 +59,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`) USING BTREE,
   UNIQUE KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
 
--- 导出  表 eventmesh.event_mesh_mysql_position 结构
+-- export table eventmesh.event_mesh_mysql_position structure
 CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -80,11 +76,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
   UNIQUE KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci 
ROW_FORMAT=DYNAMIC;
 
--- 导出  表 eventmesh.event_mesh_position_reporter_history 结构
+-- export table eventmesh.event_mesh_position_reporter_history structure
 CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
   `id` bigint NOT NULL AUTO_INCREMENT,
   `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -94,27 +88,23 @@ CREATE TABLE IF NOT EXISTS 
`event_mesh_position_reporter_history` (
   PRIMARY KEY (`id`),
   KEY `job` (`job`),
   KEY `address` (`address`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci 
COMMENT='record position reporter changes';
 
--- 导出  表 eventmesh.event_mesh_runtime_heartbeat 结构
+-- export table eventmesh.event_mesh_runtime_heartbeat structure
 CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
   `id` bigint unsigned NOT NULL AUTO_INCREMENT,
   `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
   `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
NOT NULL,
   `jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
-  `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
NOT NULL COMMENT 'runtime本地上报时间',
+  `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
NOT NULL COMMENT 'runtime local report time',
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
   UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
   KEY `jobID` (`jobID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
 
--- 导出  表 eventmesh.event_mesh_runtime_history 结构
+-- export table eventmesh.event_mesh_runtime_history structure
 CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
   `id` bigint NOT NULL AUTO_INCREMENT,
   `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -122,17 +112,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
   `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
   KEY `address` (`address`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci 
ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history';
 
--- 导出  表 eventmesh.event_mesh_task_info 结构
+-- export table eventmesh.event_mesh_task_info structure
 CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
   `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
   `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
-  `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 
'TaskState',
+  `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 
'taskstate',
   `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
   `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
   `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -140,11 +128,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`) USING BTREE,
   UNIQUE KEY `taskID` (`taskID`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
-
--- 数据导出被取消选择。
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
 
--- 导出  表 eventmesh.event_mesh_verify 结构
+-- export table eventmesh.event_mesh_verify structure
 CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
   `id` int NOT NULL,
   `taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
@@ -157,8 +143,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
 
--- 数据导出被取消选择。
-
 /*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
 /*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
 /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 357cf5d99..9affa10e6 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -50,7 +50,6 @@ import lombok.extern.slf4j.Slf4j;
 
 /**
  * for table 'event_mesh_job_info' db operation
- * 2024-05-09 15:51:45
  */
 @Service
 @Slf4j
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 8ecda8e12..2ecb2384a 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -38,6 +38,8 @@ import 
org.apache.eventmesh.openconnect.api.ConnectorCreateService;
 import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
 import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
 import org.apache.eventmesh.openconnect.api.sink.Sink;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import org.apache.commons.lang.StringUtils;
@@ -146,6 +148,11 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         executor.shutdown();
@@ -159,7 +166,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             List<CanalConnectRecord> canalConnectRecordList = 
(List<CanalConnectRecord>) connectRecord.getData();
             canalConnectRecordList = filterRecord(canalConnectRecordList);
             if (isDdlDatas(canalConnectRecordList)) {
-                doDdl(context, canalConnectRecordList);
+                doDdl(context, canalConnectRecordList, connectRecord);
             } else if (sinkConfig.isGTIDMode()) {
                 doLoadWithGtid(context, sinkConfig, connectRecord);
             } else {
@@ -197,7 +204,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             .collect(Collectors.toList());
     }
 
-    private void doDdl(DbLoadContext context, List<CanalConnectRecord> 
canalConnectRecordList) {
+    private void doDdl(DbLoadContext context, List<CanalConnectRecord> 
canalConnectRecordList, ConnectRecord connectRecord) {
         for (final CanalConnectRecord record : canalConnectRecordList) {
             try {
                 Boolean result = jdbcTemplate.execute(new 
StatementCallback<Boolean>() {
@@ -217,9 +224,30 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                     context.getFailedRecords().add(record);
                 }
             } catch (Throwable e) {
+                
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
 e));
                 throw new RuntimeException(e);
             }
         }
+        
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+    }
+
+    private SendExceptionContext buildSendExceptionContext(ConnectRecord 
record, Throwable e) {
+        SendExceptionContext sendExceptionContext = new SendExceptionContext();
+        sendExceptionContext.setMessageId(record.getRecordId());
+        sendExceptionContext.setCause(e);
+        if 
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) 
{
+            sendExceptionContext.setTopic(record.getExtension("topic"));
+        }
+        return sendExceptionContext;
+    }
+
+    private SendResult convertToSendResult(ConnectRecord record) {
+        SendResult result = new SendResult();
+        result.setMessageId(record.getRecordId());
+        if 
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) 
{
+            result.setTopic(record.getExtension("topic"));
+        }
+        return result;
     }
 
     private void doBefore(List<CanalConnectRecord> canalConnectRecordList, 
final DbLoadData loadData) {
@@ -291,6 +319,9 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             Exception ex = null;
             try {
                 ex = result.get();
+                if (ex == null) {
+                    
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+                }
             } catch (Exception e) {
                 ex = e;
             }
@@ -298,14 +329,16 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             if (skipException != null && skipException) {
                 if (ex != null) {
                     // do skip
-                    log.warn("skip exception for data : {} , caused by {}",
+                    log.warn("skip exception will ack data : {} , caused by 
{}",
                         filteredRows,
                         ExceptionUtils.getFullStackTrace(ex));
                     GtidBatchManager.removeGtidBatch(gtid);
+                    
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
                 }
             } else {
                 if (ex != null) {
                     log.error("sink connector will shutdown by " + 
ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
+                    
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
 ex));
                     gtidSingleExecutor.shutdown();
                     System.exit(1);
                 } else {
@@ -314,6 +347,8 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             }
         } else {
             log.info("Batch received, waiting for other batches.");
+            // ack this record
+            
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
         }
     }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
index 36c03b156..2b4c9d7a9 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
@@ -109,6 +109,11 @@ public class CanalSinkFullConnector implements Sink, 
ConnectorCreateService<Sink
         return null;
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void put(List<ConnectRecord> sinkRecords) {
         if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) 
== null) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index f3f8b2e16..ea5ccdeed 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -267,6 +267,11 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         if (!running) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
index df3c7571c..97730463b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -159,6 +159,11 @@ public class CanalSourceFullConnector extends 
AbstractComponent implements Sourc
         return this.config.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public List<ConnectRecord> poll() {
         while (flag.get()) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
index 4d54cb219..6b122087e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
@@ -224,6 +224,11 @@ public class ChatGPTSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         Throwable t = this.server.close().cause();
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
index 417d9cef3..8c5a1e661 100644
--- 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
@@ -103,6 +103,11 @@ public class DingDingSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         isRunning = false;
diff --git 
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
index 89222b35b..fabae0d43 100644
--- 
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java
@@ -103,6 +103,11 @@ public class FileSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         outputStream.flush();
diff --git 
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
index 6ea0a0d33..68b1a5098 100644
--- 
a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java
@@ -86,6 +86,11 @@ public class FileSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         try {
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 6d38b4530..8a1475637 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -107,6 +107,11 @@ public class HttpSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         return this.httpSinkConfig.connectorConfig.getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
         this.sinkHandler.stop();
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 4155aff91..1ca325b18 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -144,6 +144,11 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         if (this.server != null) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
index 39681bf17..cc00f1e14 100644
--- 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java
@@ -139,6 +139,11 @@ public class JdbcSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     /**
      * Stops the Connector.
      *
diff --git 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
index 2b2efcbef..810a59e72 100644
--- 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
@@ -192,6 +192,11 @@ public class JdbcSourceConnector extends SourceConnector {
         return "JDBC Source Connector";
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     /**
      * Stops the Connector.
      *
diff --git 
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
index b257cd0f4..0adafc1ce 100644
--- 
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java
@@ -94,6 +94,11 @@ public class KafkaSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         producer.close();
diff --git 
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
index a3be1cbf9..d57312693 100644
--- 
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
@@ -94,6 +94,11 @@ public class KafkaSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         kafkaConsumer.unsubscribe();
diff --git 
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
index a12a1c746..b14f77ecd 100644
--- 
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java
@@ -82,6 +82,11 @@ public class KnativeSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         started.compareAndSet(true, false);
diff --git 
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
index 537c1ad4d..1b0c033e8 100644
--- 
a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java
@@ -65,6 +65,11 @@ public class KnativeSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         started.compareAndSet(true, false);
diff --git 
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
index d340dffd1..9981322e8 100644
--- 
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
@@ -110,6 +110,11 @@ public class LarkSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         if (!started.compareAndSet(true, false)) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
index 776ea8d71..1001ffa58 100644
--- 
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java
@@ -87,6 +87,11 @@ public class MongodbSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
         this.client.stop();
diff --git 
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
index e57c39671..df3f66d6a 100644
--- 
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
@@ -93,6 +93,11 @@ public class MongodbSourceConnector implements Source {
         return this.sourceConfig.connectorConfig.getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
         this.client.stop();
diff --git 
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
index 63444efe2..0f00a7e38 100644
--- 
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java
@@ -74,6 +74,11 @@ public class OpenFunctionSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
index b66bf9b18..534ecfb79 100644
--- 
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
@@ -76,6 +76,11 @@ public class OpenFunctionSourceConnector implements Source {
         return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
index e5f09e435..e089ef676 100644
--- 
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java
@@ -109,6 +109,11 @@ public class PravegaSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         writerMap.forEach((topic, writer) -> writer.close());
diff --git 
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
index 2611617d8..836779dbc 100644
--- 
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
@@ -148,6 +148,11 @@ public class PravegaSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         sourceHandlerMap.forEach((topic, handler) -> {
diff --git 
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
index 5c78c718e..0cafed73f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
@@ -145,6 +145,11 @@ public class PrometheusSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         log.info("prometheus source connector stop.");
diff --git 
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
index 9ff1f22a2..3f90c6c1b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java
@@ -85,6 +85,11 @@ public class PulsarSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         try {
diff --git 
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
index 212d3eb48..0bc576221 100644
--- 
a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java
@@ -87,6 +87,11 @@ public class PulsarSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         try {
diff --git 
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
index 4a94a2cb1..08d1cefba 100644
--- 
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java
@@ -95,6 +95,11 @@ public class RabbitMQSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         if (started) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
index 655c20d9b..0b7e726bd 100644
--- 
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
@@ -117,6 +117,11 @@ public class RabbitMQSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         if (started) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
index 83c3498a9..5b7d27c3b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java
@@ -85,6 +85,11 @@ public class RedisSinkConnector implements Sink {
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
         this.redissonClient.shutdown();
diff --git 
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
index 70adce59e..868639c20 100644
--- 
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
@@ -94,6 +94,11 @@ public class RedisSourceConnector implements Source {
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
         this.topic.removeAllListeners();
diff --git 
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
index ae9d4824e..31d45a28f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java
@@ -78,6 +78,11 @@ public class RocketMQSinkConnector implements Sink, 
ConnectorCreateService<Sink>
         return this.sinkConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         producer.shutdown();
diff --git 
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
index 8ccb84acc..410f927d7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java
@@ -206,6 +206,11 @@ public class RocketMQSourceConnector implements Source, 
ConnectorCreateService<S
         return this.sourceConfig.getConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         consumer.unsubscribe(sourceConfig.getConnectorConfig().getTopic());
diff --git 
a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
index d0dc30c15..078ed7691 100644
--- 
a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java
@@ -121,6 +121,11 @@ public class S3SourceConnector implements Source {
         return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
index e48760d50..836409af7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
@@ -84,6 +84,11 @@ public class SlackSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() {
         isRunning = false;
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
index 94c40eea5..9ba99cd54 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java
@@ -77,6 +77,11 @@ public class SpringSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
index a337c1cd8..5f38914bb 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java
@@ -17,7 +17,7 @@
 
 package org.apache.eventmesh.connector.spring.source;
 
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
 
 /**
  * Operations for sending messages.
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index 2ab5a3a3c..db286eb60 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -25,10 +25,10 @@ import 
org.apache.eventmesh.common.remote.offset.spring.SpringRecordOffset;
 import org.apache.eventmesh.common.remote.offset.spring.SpringRecordPartition;
 import org.apache.eventmesh.connector.spring.source.MessageSendingOperations;
 import org.apache.eventmesh.openconnect.SourceWorker;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
 import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
 import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.source.Source;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.util.ArrayList;
@@ -95,6 +95,11 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
         return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws Exception {
 
@@ -123,6 +128,7 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
 
     /**
      * Send message.
+     *
      * @param message message to send
      */
     @Override
@@ -136,9 +142,9 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
 
     /**
      * Send message with a callback.
-     * @param message message to send.
-     * @param workerCallback After the user sends the message to the Connector,
-     *                       the SourceWorker will fetch message and invoke.
+     *
+     * @param message        message to send.
+     * @param workerCallback After the user sends the message to the 
Connector, the SourceWorker will fetch message and invoke.
      */
     @Override
     public void send(Object message, SendMessageCallback workerCallback) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
index dec3f5e5d..6908d119b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
@@ -115,6 +115,11 @@ public class WeChatSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws IOException {
         isRunning = false;
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
index ef6aed58c..ca628fa59 100644
--- 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
@@ -95,6 +95,11 @@ public class WeComSinkConnector implements Sink {
         return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
     }
 
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
     @Override
     public void stop() throws IOException {
         isRunning = false;
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
index b7ea8890e..a734bb6ef 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java
@@ -19,9 +19,9 @@ package org.apache.eventmesh.spring.pub;
 
 import org.apache.eventmesh.common.utils.JsonUtils;
 import 
org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector;
-import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
-import org.apache.eventmesh.openconnect.api.callback.SendResult;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -53,8 +53,8 @@ public class SpringPubController {
             }
 
             @Override
-            public void onException(SendExcepionContext sendExcepionContext) {
-                log.info("Spring source worker send message to EventMesh 
failed!", sendExcepionContext.getCause());
+            public void onException(SendExceptionContext sendExceptionContext) 
{
+                log.info("Spring source worker send message to EventMesh 
failed!", sendExceptionContext.getCause());
             }
         });
         return "success!";
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 6e48aa1de..2a2162a7a 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -32,11 +32,11 @@ import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.common.utils.SystemUtils;
-import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
-import org.apache.eventmesh.openconnect.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.source.Source;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
@@ -264,8 +264,8 @@ public class SourceWorker implements ConnectorWorker {
         return result;
     }
 
-    private SendExcepionContext convertToExceptionContext(CloudEvent event, 
Throwable cause) {
-        SendExcepionContext exceptionContext = new SendExcepionContext();
+    private SendExceptionContext convertToExceptionContext(CloudEvent event, 
Throwable cause) {
+        SendExceptionContext exceptionContext = new SendExceptionContext();
         exceptionContext.setTopic(event.getId());
         exceptionContext.setMessageId(event.getId());
         exceptionContext.setCause(cause);
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
index 8ac09eac3..07e44aea9 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
@@ -34,8 +34,7 @@ public interface Connector extends ComponentLifeCycle {
     Class<? extends Config> configClass();
 
     /**
-     * This init method is obsolete. For detailed discussion,
-     * please see <a 
href="https://github.com/apache/eventmesh/issues/4565";>here</a>
+     * This init method is obsolete. For detailed discussion, please see <a 
href="https://github.com/apache/eventmesh/issues/4565";>here</a>
      * <p>
      * Initializes the Connector with the provided configuration.
      *
@@ -67,4 +66,12 @@ public interface Connector extends ComponentLifeCycle {
      */
     String name();
 
+    /**
+     * This method will be called when an exception occurs while processing a 
ConnectRecord object. This method can be used to handle the exception,
+     * such as logging error information, or stopping the connector's 
operation when an exception occurs.
+     *
+     * @param record The ConnectRecord object that was being processed when 
the exception occurred
+     */
+    void onException(ConnectRecord record);
+
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
index 55c88ce55..f70e77248 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
@@ -22,6 +22,7 @@ import 
org.apache.eventmesh.common.remote.offset.RecordPosition;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;
 
 import java.util.List;
+import java.util.Map;
 
 import lombok.Data;
 
@@ -35,6 +36,8 @@ public class SourceConnectorContext implements 
ConnectorContext {
 
     public SourceConfig sourceConfig;
 
+    public Map<String, Object> runtimeConfig;
+
     // initial record position
     public List<RecordPosition> recordPositionList;
 
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
similarity index 90%
rename from 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
rename to 
eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
index 0311ceaef..974b19a54 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
 
-public class SendExcepionContext {
+public class SendExceptionContext {
 
     private String messageId;
     private String topic;
     private Throwable cause;
 
-    public SendExcepionContext() {
+    public SendExceptionContext() {
     }
 
     public String getMessageId() {
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
similarity index 87%
rename from 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
rename to 
eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
index fd6baba7e..8346cf36b 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
 
 /**
  * Message sending callback interface.
@@ -24,5 +24,5 @@ public interface SendMessageCallback {
 
     void onSuccess(SendResult sendResult);
 
-    void onException(SendExcepionContext sendExcepionContext);
+    void onException(SendExceptionContext sendExceptionContext);
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
similarity index 95%
rename from 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
rename to 
eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
index 8cd861f6d..9afc745f3 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.openconnect.api.callback;
+package org.apache.eventmesh.openconnect.offsetmgmt.api.callback;
 
 public class SendResult {
 
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index cda57e375..b3fc4346c 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -20,15 +20,19 @@ package 
org.apache.eventmesh.openconnect.offsetmgmt.api.data;
 import org.apache.eventmesh.common.remote.offset.RecordOffset;
 import org.apache.eventmesh.common.remote.offset.RecordPartition;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
 
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 
 /**
  * SourceDataEntries are generated by SourceTasks and passed to specific 
message queue to store.
  */
 public class ConnectRecord {
 
+    private final String recordId = UUID.randomUUID().toString();
+
     private Long timestamp;
 
     private Object data;
@@ -37,6 +41,8 @@ public class ConnectRecord {
 
     private KeyValue extensions;
 
+    private SendMessageCallback callback;
+
     public ConnectRecord() {
 
     }
@@ -57,6 +63,10 @@ public class ConnectRecord {
         this.data = data;
     }
 
+    public String getRecordId() {
+        return recordId;
+    }
+
     public Long getTimestamp() {
         return timestamp;
     }
@@ -127,6 +137,14 @@ public class ConnectRecord {
         return this.extensions.getObject(key);
     }
 
+    public SendMessageCallback getCallback() {
+        return callback;
+    }
+
+    public void setCallback(SendMessageCallback callback) {
+        this.callback = callback;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -136,19 +154,20 @@ public class ConnectRecord {
             return false;
         }
         ConnectRecord that = (ConnectRecord) o;
-        return Objects.equals(timestamp, that.timestamp) && 
Objects.equals(data, that.data)
+        return Objects.equals(recordId, that.recordId) && 
Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data)
             && Objects.equals(position, that.position) && 
Objects.equals(extensions, that.extensions);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, data, position, extensions);
+        return Objects.hash(recordId, timestamp, data, position, extensions);
     }
 
     @Override
     public String toString() {
         return "ConnectRecord{"
-            + "timestamp=" + timestamp
+            + "recordId=" + recordId
+            + ", timestamp=" + timestamp
             + ", data=" + data
             + ", position=" + position
             + ", extensions=" + extensions
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 0335a0956..6cd0452b8 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -38,12 +38,14 @@ import 
org.apache.eventmesh.common.remote.response.FetchJobResponse;
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
-import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
 import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
 import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory;
 import org.apache.eventmesh.openconnect.api.sink.Sink;
 import org.apache.eventmesh.openconnect.api.source.Source;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
@@ -56,6 +58,7 @@ import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -63,7 +66,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -207,6 +209,7 @@ public class ConnectorRuntime implements Runtime {
         SourceConfig sourceConfig = (SourceConfig) 
ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), 
sourceConnector.configClass());
         SourceConnectorContext sourceConnectorContext = new 
SourceConnectorContext();
         sourceConnectorContext.setSourceConfig(sourceConfig);
+        
sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
         sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
         if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
             
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
@@ -332,15 +335,36 @@ public class ConnectorRuntime implements Runtime {
                         reportVerifyRequest(record, connectorRuntimeConfig, 
ConnectorStage.SOURCE);
                     }
 
+                    // set a callback for this record
+                    // if used the memory storage callback will be triggered 
after sink put success
+                    record.setCallback(new SendMessageCallback() {
+                        @Override
+                        public void onSuccess(SendResult result) {
+                            // commit record
+                            sourceConnector.commit(record);
+                            Optional<RecordOffsetManagement.SubmittedPosition> 
submittedRecordPosition = prepareToUpdateRecordOffset(record);
+                            
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+                            Optional<SendMessageCallback> callback =
+                                
Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> 
(SendMessageCallback) v);
+                            callback.ifPresent(cb -> 
cb.onSuccess(convertToSendResult(record)));
+                        }
+
+                        @Override
+                        public void onException(SendExceptionContext 
sendExceptionContext) {
+                            // handle exception
+                            sourceConnector.onException(record);
+                            log.error("send record to sink callback exception, 
process will shut down, record: {}", record,
+                                sendExceptionContext.getCause());
+                            try {
+                                stop();
+                            } catch (Exception e) {
+                                log.error("Failed to stop after exception", e);
+                            }
+                        }
+                    });
+
                     queue.put(record);
-                    Optional<RecordOffsetManagement.SubmittedPosition> 
submittedRecordPosition = prepareToUpdateRecordOffset(record);
-                    Optional<SendMessageCallback> callback =
-                        
Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> 
(SendMessageCallback) v);
-                    // commit record
-                    this.sourceConnector.commit(record);
-                    
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
-                    // TODO:finish the optional callback
-                    // callback.ifPresent(cb -> cb.onSuccess(record));
+
                     offsetManagement.awaitAllMessages(5000, 
TimeUnit.MILLISECONDS);
                     // update & commit offset
                     updateCommittableOffsets();
@@ -350,13 +374,20 @@ public class ConnectorRuntime implements Runtime {
         }
     }
 
+    private SendResult convertToSendResult(ConnectRecord record) {
+        SendResult result = new SendResult();
+        result.setMessageId(record.getRecordId());
+        if (StringUtils.isNotEmpty(record.getExtension("topic"))) {
+            result.setTopic(record.getExtension("topic"));
+        }
+        return result;
+    }
+
     private void reportVerifyRequest(ConnectRecord record, 
ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
-        UUID uuid = UUID.randomUUID();
-        String recordId = uuid.toString();
         String md5Str = md5(record.toString());
         ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
         reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
-        reportVerifyRequest.setRecordID(recordId);
+        reportVerifyRequest.setRecordID(record.getRecordId());
         reportVerifyRequest.setRecordSig(md5Str);
         reportVerifyRequest.setConnectorName(
             IPUtils.getLocalAddress() + "_" + 
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
index 5a58cce08..ab6fc3aaf 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java
@@ -37,6 +37,8 @@ public class ConnectorRuntimeConfig {
 
     private String region;
 
+    private Map<String, Object> runtimeConfig;
+
     private String sourceConnectorType;
 
     private String sourceConnectorDesc;
diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml 
b/eventmesh-runtime-v2/src/main/resources/connector.yaml
index bf7f58028..2e79e5ced 100644
--- a/eventmesh-runtime-v2/src/main/resources/connector.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml
@@ -18,3 +18,4 @@
 taskID: 1
 jobID: 1
 region: region1
+runtimeConfig: # this used for connector runtime config


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to