This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit e89fa66455624cbebcd6dad39156b79f7c5a7fbd
Author: xwm1992 <[email protected]>
AuthorDate: Fri Jun 28 12:52:22 2024 +0800

    fix ack offset read & persist
---
 build.gradle                                       | 15 +----
 .../server/web/db/entity/EventMeshJobDetail.java   |  3 +-
 .../position/EventMeshPositionBizService.java      |  6 +-
 .../service/position/IFetchPositionHandler.java    |  4 +-
 .../position/impl/MysqlPositionHandler.java        | 14 +++--
 .../connector/rdb/canal/CanalSourceConfig.java     |  4 ++
 .../common/remote/response/FetchJobResponse.java   |  3 +-
 .../remote/response/FetchPositionResponse.java     |  6 +-
 .../eventmesh-connector-canal/build.gradle         |  1 -
 .../connector/canal/source/EntryParser.java        | 10 +++-
 .../source/connector/CanalSourceConnector.java     | 67 +++++++++++++++-------
 .../api/connector/SourceConnectorContext.java      |  6 ++
 .../offsetmgmt/admin/AdminOffsetService.java       | 23 +++++---
 .../runtime/connector/ConnectorRuntime.java        |  5 ++
 14 files changed, 110 insertions(+), 57 deletions(-)

diff --git a/build.gradle b/build.gradle
index 151f8767b..df86a4956 100644
--- a/build.gradle
+++ b/build.gradle
@@ -160,9 +160,10 @@ tasks.register('dist') {
     dependsOn('generateDistLicense', 'generateDistNotice')
     def includedProjects =
             ["eventmesh-common",
-             "eventmesh-admin-server",
              "eventmesh-meta:eventmesh-meta-api",
              "eventmesh-metrics-plugin:eventmesh-metrics-api",
+             "eventmesh-openconnect:eventmesh-openconnect-java",
+             
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
              "eventmesh-protocol-plugin:eventmesh-protocol-api",
              "eventmesh-registry:eventmesh-registry-api",
              "eventmesh-retry:eventmesh-retry-api",
@@ -663,16 +664,6 @@ subprojects {
 
     dependencyManagement {
         dependencies {
-            dependencySet(group: 'org.springframework', version: '5.3.31') {
-                entry 'spring-aop'
-                entry 'spring-beans'
-                entry 'spring-context'
-                entry 'spring-core'
-                entry 'spring-expression'
-                entry 'spring-jcl'
-                entry 'spring-jdbc'
-                entry 'spring-tx'
-            }
 
             dependency "org.apache.commons:commons-lang3:3.6"
             dependency "org.apache.commons:commons-collections4:4.4"
@@ -723,7 +714,7 @@ subprojects {
             dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
             dependency "com.jayway.jsonpath:json-path:2.9.0"
 
-            dependency 
"org.springframework.boot:spring-boot-starter-web:2.7.18"
+            dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
             dependency "io.openmessaging:registry-server:0.0.1"
 
             dependency "org.junit.jupiter:junit-jupiter:5.6.0"
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
index b4a836e8b..849a90a88 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.common.remote.JobState;
 import org.apache.eventmesh.common.remote.job.JobTransportType;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 
+import java.util.List;
 import java.util.Map;
 
 import lombok.Data;
@@ -42,7 +43,7 @@ public class EventMeshJobDetail {
 
     private String sinkConnectorDesc;
 
-    private RecordPosition position;
+    private List<RecordPosition> position;
 
     private JobState state;
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
index 7d6febdf4..d3b6ff555 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
@@ -25,6 +25,8 @@ import 
org.apache.eventmesh.common.remote.offset.RecordPosition;
 import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
 import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
 
+import java.util.List;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
     PositionHandlerFactory factory;
 
     // called isValidateReportRequest before call this
-    public RecordPosition getPosition(FetchPositionRequest request, Metadata 
metadata) {
+    public List<RecordPosition> getPosition(FetchPositionRequest request, 
Metadata metadata) {
         if (request == null) {
             return null;
         }
@@ -68,7 +70,7 @@ public class EventMeshPositionBizService {
         return handler.handler(request, metadata);
     }
 
-    public RecordPosition getPositionByJobID(Integer jobID, DataSourceType 
type) {
+    public List<RecordPosition> getPositionByJobID(Integer jobID, 
DataSourceType type) {
         if (jobID == null || type == null) {
             return null;
         }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
index 9a4c324dc..2c039062f 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
@@ -21,10 +21,12 @@ import 
org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
 
+import java.util.List;
+
 /**
  * IFetchPositionHandler
  */
 public interface IFetchPositionHandler {
 
-    RecordPosition handler(FetchPositionRequest request, Metadata metadata);
+    List<RecordPosition> handler(FetchPositionRequest request, Metadata 
metadata);
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
index 623864fa6..d0d32a5c6 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -31,6 +31,7 @@ import 
org.apache.eventmesh.common.remote.request.FetchPositionRequest;
 import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
 import org.apache.eventmesh.common.utils.JsonUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -142,20 +143,21 @@ public class MysqlPositionHandler extends PositionHandler 
{
     }
 
     @Override
-    public RecordPosition handler(FetchPositionRequest request, Metadata 
metadata) {
-        EventMeshMysqlPosition position = 
positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
+    public List<RecordPosition> handler(FetchPositionRequest request, Metadata 
metadata) {
+        List<EventMeshMysqlPosition> positionList = 
positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
             request.getJobID()));
-        RecordPosition recordPosition = null;
-        if (position != null) {
+        List<RecordPosition> recordPositionList = new ArrayList<>();
+        for (EventMeshMysqlPosition position : positionList) {
+            RecordPosition recordPosition = new RecordPosition();
             CanalRecordPartition partition = new CanalRecordPartition();
             partition.setTimeStamp(position.getTimestamp());
             partition.setJournalName(position.getJournalName());
             CanalRecordOffset offset = new CanalRecordOffset();
             offset.setOffset(position.getPosition());
-            recordPosition = new RecordPosition();
             recordPosition.setRecordPartition(partition);
             recordPosition.setRecordOffset(offset);
+            recordPositionList.add(recordPosition);
         }
-        return recordPosition;
+        return recordPositionList;
     }
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index d6e6a7790..d75ceb6b5 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -49,6 +49,10 @@ public class CanalSourceConfig extends SourceConfig {
 
     private Long batchTimeout = -1L;
 
+    private String tableFilter;
+
+    private String fieldFilter;
+
     private List<RecordPosition> recordPositions;
 
     // ================================= channel parameter
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
index 137e49bdc..a51cb32b9 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
@@ -22,6 +22,7 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode;
 import org.apache.eventmesh.common.remote.job.JobTransportType;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 
+import java.util.List;
 import java.util.Map;
 
 import lombok.Data;
@@ -45,7 +46,7 @@ public class FetchJobResponse extends BaseRemoteResponse {
 
     private String sinkConnectorDesc;
 
-    private RecordPosition position;
+    private List<RecordPosition> position;
 
     private JobState state;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
index e9a7a3828..613623d65 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
@@ -20,6 +20,8 @@ package org.apache.eventmesh.common.remote.response;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 
+import java.util.List;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
@@ -27,7 +29,7 @@ import lombok.EqualsAndHashCode;
 @EqualsAndHashCode(callSuper = true)
 public class FetchPositionResponse extends BaseRemoteResponse {
 
-    private RecordPosition recordPosition;
+    private List<RecordPosition> recordPosition;
 
     public static FetchPositionResponse successResponse() {
         FetchPositionResponse response = new FetchPositionResponse();
@@ -36,7 +38,7 @@ public class FetchPositionResponse extends BaseRemoteResponse 
{
         return response;
     }
 
-    public static FetchPositionResponse successResponse(RecordPosition 
recordPosition) {
+    public static FetchPositionResponse successResponse(List<RecordPosition> 
recordPosition) {
         FetchPositionResponse response = successResponse();
         response.setRecordPosition(recordPosition);
         return response;
diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle 
b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
index 0d914b7ae..640cb5ce4 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
@@ -26,7 +26,6 @@ dependencies {
     implementation project(":eventmesh-common")
     implementation canal
     implementation "com.alibaba:druid:1.2.6"
-//    implementation "org.apache.ddlutils:ddlutils:1.0"
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
     testImplementation "org.mockito:mockito-core"
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index c54462374..32c55ec42 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -26,6 +26,7 @@ import org.apache.eventmesh.connector.canal.model.EventType;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,11 +47,12 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class EntryParser {
 
-    public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, 
List<Entry> datas) {
+    public Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig 
sourceConfig, List<Entry> datas) {
         List<CanalConnectRecord> recordList = new ArrayList<>();
         List<Entry> transactionDataBuffer = new ArrayList<>();
         // need check weather the entry is loopback
         boolean needSync;
+        Map<Long, List<CanalConnectRecord>> recordMap = new HashMap<>();
         try {
             for (Entry entry : datas) {
                 switch (entry.getEntryType()) {
@@ -63,17 +65,19 @@ public class EntryParser {
                         break;
                     case TRANSACTIONEND:
                         parseRecordListWithEntryBuffer(sourceConfig, 
recordList, transactionDataBuffer);
+                        if (!recordList.isEmpty()) {
+                            
recordMap.put(entry.getHeader().getLogfileOffset(), recordList);
+                        }
                         transactionDataBuffer.clear();
                         break;
                     default:
                         break;
                 }
             }
-            parseRecordListWithEntryBuffer(sourceConfig, recordList, 
transactionDataBuffer);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        return recordList;
+        return recordMap;
     }
 
     private void parseRecordListWithEntryBuffer(CanalSourceConfig 
sourceConfig, List<CanalConnectRecord> recordList,
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index c179124ce..577142e00 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -31,16 +31,18 @@ import 
org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.source.Source;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 
-
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
@@ -74,7 +76,9 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
 
     private ClientIdentity clientIdentity;
 
-    private String filter = null;
+    private String tableFilter = null;
+
+    private String fieldFilter = null;
 
     private volatile boolean running = false;
 
@@ -95,6 +99,16 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
     public void init(ConnectorContext connectorContext) throws Exception {
         SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
         this.sourceConfig = (CanalSourceConfig) 
sourceConnectorContext.getSourceConfig();
+        if (sourceConnectorContext.getRecordPositionList() != null) {
+            
this.sourceConfig.setRecordPositions(sourceConnectorContext.getRecordPositionList());
+        }
+
+        if (StringUtils.isNotEmpty(sourceConfig.getTableFilter())) {
+            tableFilter = sourceConfig.getTableFilter();
+        }
+        if (StringUtils.isNotEmpty(sourceConfig.getFieldFilter())) {
+            fieldFilter = sourceConfig.getFieldFilter();
+        }
 
         canalServer = CanalServerWithEmbedded.instance();
 
@@ -103,7 +117,7 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
             public CanalInstance generate(String destination) {
                 Canal canal = buildCanal(sourceConfig);
 
-                CanalInstanceWithManager instance = new 
CanalInstanceWithManager(canal, filter) {
+                CanalInstanceWithManager instance = new 
CanalInstanceWithManager(canal, tableFilter) {
 
                     protected CanalHAController initHaController() {
                         return super.initHaController();
@@ -118,6 +132,9 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
                             ((MysqlEventParser) 
eventParser).setSupportBinlogImages("FULL");
                             MysqlEventParser mysqlEventParser = 
(MysqlEventParser) eventParser;
                             mysqlEventParser.setParallel(false);
+                            if (StringUtils.isNotEmpty(fieldFilter)) {
+                                mysqlEventParser.setFieldFilter(fieldFilter);
+                            }
 
                             CanalHAController haController = 
mysqlEventParser.getHaController();
                             if (!haController.isStart()) {
@@ -204,7 +221,7 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         canalServer.start();
 
         canalServer.start(sourceConfig.getDestination());
-        this.clientIdentity = new 
ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), 
filter);
+        this.clientIdentity = new 
ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), 
tableFilter);
         canalServer.subscribe(clientIdentity);
 
         running = true;
@@ -274,23 +291,31 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         EntryParser entryParser = new EntryParser();
 
         List<ConnectRecord> result = new ArrayList<>();
-
-        List<CanalConnectRecord> connectRecordList = 
entryParser.parse(sourceConfig, entries);
-
-        if (connectRecordList != null && !connectRecordList.isEmpty()) {
-            CanalConnectRecord lastRecord = 
connectRecordList.get(connectRecordList.size() - 1);
-
-            CanalRecordPartition canalRecordPartition = new 
CanalRecordPartition();
-            canalRecordPartition.setJournalName(lastRecord.getJournalName());
-            canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
-
-            CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
-            canalRecordOffset.setOffset(lastRecord.getBinLogOffset());
-
-            ConnectRecord connectRecord = new 
ConnectRecord(canalRecordPartition, canalRecordOffset, 
System.currentTimeMillis());
-            connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
-            connectRecord.setData(connectRecordList);
-            result.add(connectRecord);
+        // key: Xid offset
+        Map<Long, List<CanalConnectRecord>> connectorRecordMap = 
entryParser.parse(sourceConfig, entries);
+
+        if (!connectorRecordMap.isEmpty()) {
+            Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = 
connectorRecordMap.entrySet();
+            for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
+                // Xid offset
+                Long binLogOffset = entry.getKey();
+                List<CanalConnectRecord> connectRecordList = entry.getValue();
+                CanalConnectRecord lastRecord = 
entry.getValue().get(connectRecordList.size() - 1);
+                CanalRecordPartition canalRecordPartition = new 
CanalRecordPartition();
+                
canalRecordPartition.setJournalName(lastRecord.getJournalName());
+                canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
+
+                CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
+                canalRecordOffset.setOffset(binLogOffset);
+
+                ConnectRecord connectRecord = new 
ConnectRecord(canalRecordPartition, canalRecordOffset, 
System.currentTimeMillis());
+                connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
+                connectRecord.setData(connectRecordList);
+                result.add(connectRecord);
+            }
+        } else {
+            // for the message has been filtered need ack message
+            canalServer.ack(clientIdentity, message.getId());
         }
 
         return result;
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
index 76800d9c2..55c88ce55 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
@@ -18,8 +18,11 @@
 package org.apache.eventmesh.openconnect.api.connector;
 
 import org.apache.eventmesh.common.config.connector.SourceConfig;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;
 
+import java.util.List;
+
 import lombok.Data;
 
 /**
@@ -32,4 +35,7 @@ public class SourceConnectorContext implements 
ConnectorContext {
 
     public SourceConfig sourceConfig;
 
+    // initial record position
+    public List<RecordPosition> recordPositionList;
+
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index c011a1520..16a6fca3d 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -69,6 +69,8 @@ public class AdminOffsetService implements 
OffsetManagementService {
 
     public KeyValueStore<RecordPartition, RecordOffset> positionStore;
 
+    public KeyValueStore<String, RecordPosition> positionStore2;
+
     private String jobId;
 
     private JobState jobState;
@@ -106,6 +108,7 @@ public class AdminOffsetService implements 
OffsetManagementService {
         ReportPositionRequest reportPositionRequest = new 
ReportPositionRequest();
         reportPositionRequest.setJobID(jobId);
         reportPositionRequest.setState(jobState);
+        reportPositionRequest.setDataSourceType(dataSourceType);
         reportPositionRequest.setAddress(IPUtils.getLocalAddress());
 
         reportPositionRequest.setRecordPositionList(recordToSyncList);
@@ -119,6 +122,10 @@ public class AdminOffsetService implements 
OffsetManagementService {
                 .build())
             .build();
         requestObserver.onNext(payload);
+
+        for (Map.Entry<RecordPartition, RecordOffset> entry : 
recordMap.entrySet()) {
+            positionStore.remove(entry.getKey());
+        }
     }
 
     @Override
@@ -157,8 +164,9 @@ public class AdminOffsetService implements 
OffsetManagementService {
                     
JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), 
FetchPositionResponse.class);
                 assert fetchPositionResponse != null;
                 if (fetchPositionResponse.isSuccess()) {
-                    
positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(),
-                        
fetchPositionResponse.getRecordPosition().getRecordOffset());
+                    for (RecordPosition recordPosition : 
fetchPositionResponse.getRecordPosition()) {
+                        positionStore.put(recordPosition.getRecordPartition(), 
recordPosition.getRecordOffset());
+                    }
                 }
             }
         }
@@ -175,9 +183,9 @@ public class AdminOffsetService implements 
OffsetManagementService {
             fetchPositionRequest.setJobID(jobId);
             fetchPositionRequest.setAddress(IPUtils.getLocalAddress());
             fetchPositionRequest.setDataSourceType(dataSourceType);
-            RecordPosition recordPosition = new RecordPosition();
-            recordPosition.setRecordPartition(partition);
-            fetchPositionRequest.setRecordPosition(recordPosition);
+            RecordPosition fetchRecordPosition = new RecordPosition();
+            fetchRecordPosition.setRecordPartition(partition);
+            fetchPositionRequest.setRecordPosition(fetchRecordPosition);
 
             Metadata metadata = Metadata.newBuilder()
                 .setType(FetchPositionRequest.class.getSimpleName())
@@ -195,8 +203,9 @@ public class AdminOffsetService implements 
OffsetManagementService {
                     
JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), 
FetchPositionResponse.class);
                 assert fetchPositionResponse != null;
                 if (fetchPositionResponse.isSuccess()) {
-                    
positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(),
-                        
fetchPositionResponse.getRecordPosition().getRecordOffset());
+                    for (RecordPosition recordPosition : 
fetchPositionResponse.getRecordPosition()) {
+                        positionStore.put(recordPosition.getRecordPartition(), 
recordPosition.getRecordOffset());
+                    }
                 }
             }
         }
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 2f16834b4..65676903d 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -53,6 +53,8 @@ import org.apache.eventmesh.runtime.Runtime;
 import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -201,6 +203,9 @@ public class ConnectorRuntime implements Runtime {
         SourceConnectorContext sourceConnectorContext = new 
SourceConnectorContext();
         sourceConnectorContext.setSourceConfig(sourceConfig);
         sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+        if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+            
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+        }
 
         // spi load offsetMgmtService
         this.offsetManagement = new RecordOffsetManagement();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to