This is an automated email from the ASF dual-hosted git repository. mikexue pushed a commit to branch eventmesh-function in repository https://gitbox.apache.org/repos/asf/eventmesh.git
commit e89fa66455624cbebcd6dad39156b79f7c5a7fbd Author: xwm1992 <[email protected]> AuthorDate: Fri Jun 28 12:52:22 2024 +0800 fix ack offset read & persist --- build.gradle | 15 +---- .../server/web/db/entity/EventMeshJobDetail.java | 3 +- .../position/EventMeshPositionBizService.java | 6 +- .../service/position/IFetchPositionHandler.java | 4 +- .../position/impl/MysqlPositionHandler.java | 14 +++-- .../connector/rdb/canal/CanalSourceConfig.java | 4 ++ .../common/remote/response/FetchJobResponse.java | 3 +- .../remote/response/FetchPositionResponse.java | 6 +- .../eventmesh-connector-canal/build.gradle | 1 - .../connector/canal/source/EntryParser.java | 10 +++- .../source/connector/CanalSourceConnector.java | 67 +++++++++++++++------- .../api/connector/SourceConnectorContext.java | 6 ++ .../offsetmgmt/admin/AdminOffsetService.java | 23 +++++--- .../runtime/connector/ConnectorRuntime.java | 5 ++ 14 files changed, 110 insertions(+), 57 deletions(-) diff --git a/build.gradle b/build.gradle index 151f8767b..df86a4956 100644 --- a/build.gradle +++ b/build.gradle @@ -160,9 +160,10 @@ tasks.register('dist') { dependsOn('generateDistLicense', 'generateDistNotice') def includedProjects = ["eventmesh-common", - "eventmesh-admin-server", "eventmesh-meta:eventmesh-meta-api", "eventmesh-metrics-plugin:eventmesh-metrics-api", + "eventmesh-openconnect:eventmesh-openconnect-java", + "eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api", "eventmesh-protocol-plugin:eventmesh-protocol-api", "eventmesh-registry:eventmesh-registry-api", "eventmesh-retry:eventmesh-retry-api", @@ -663,16 +664,6 @@ subprojects { dependencyManagement { dependencies { - dependencySet(group: 'org.springframework', version: '5.3.31') { - entry 'spring-aop' - entry 'spring-beans' - entry 'spring-context' - entry 'spring-core' - entry 'spring-expression' - entry 'spring-jcl' - entry 'spring-jdbc' - entry 'spring-tx' - } dependency "org.apache.commons:commons-lang3:3.6" dependency "org.apache.commons:commons-collections4:4.4" @@ -723,7 +714,7 @@ subprojects { dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0" dependency "com.jayway.jsonpath:json-path:2.9.0" - dependency "org.springframework.boot:spring-boot-starter-web:2.7.18" + dependency "org.springframework.boot:spring-boot-starter-web:2.5.4" dependency "io.openmessaging:registry-server:0.0.1" dependency "org.junit.jupiter:junit-jupiter:5.6.0" diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java index b4a836e8b..849a90a88 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.job.JobTransportType; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import java.util.List; import java.util.Map; import lombok.Data; @@ -42,7 +43,7 @@ public class EventMeshJobDetail { private String sinkConnectorDesc; - private RecordPosition position; + private List<RecordPosition> position; private JobState state; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java index 7d6febdf4..d3b6ff555 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java @@ -25,6 +25,8 @@ import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import java.util.List; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -38,7 +40,7 @@ public class EventMeshPositionBizService { PositionHandlerFactory factory; // called isValidateReportRequest before call this - public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) { + public List<RecordPosition> getPosition(FetchPositionRequest request, Metadata metadata) { if (request == null) { return null; } @@ -68,7 +70,7 @@ public class EventMeshPositionBizService { return handler.handler(request, metadata); } - public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) { + public List<RecordPosition> getPositionByJobID(Integer jobID, DataSourceType type) { if (jobID == null || type == null) { return null; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java index 9a4c324dc..2c039062f 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java @@ -21,10 +21,12 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import java.util.List; + /** * IFetchPositionHandler */ public interface IFetchPositionHandler { - RecordPosition handler(FetchPositionRequest request, Metadata metadata); + List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index 623864fa6..d0d32a5c6 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -31,6 +31,7 @@ import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.utils.JsonUtils; +import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -142,20 +143,21 @@ public class MysqlPositionHandler extends PositionHandler { } @Override - public RecordPosition handler(FetchPositionRequest request, Metadata metadata) { - EventMeshMysqlPosition position = positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID", + public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) { + List<EventMeshMysqlPosition> positionList = positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID", request.getJobID())); - RecordPosition recordPosition = null; - if (position != null) { + List<RecordPosition> recordPositionList = new ArrayList<>(); + for (EventMeshMysqlPosition position : positionList) { + RecordPosition recordPosition = new RecordPosition(); CanalRecordPartition partition = new CanalRecordPartition(); partition.setTimeStamp(position.getTimestamp()); partition.setJournalName(position.getJournalName()); CanalRecordOffset offset = new CanalRecordOffset(); offset.setOffset(position.getPosition()); - recordPosition = new RecordPosition(); recordPosition.setRecordPartition(partition); recordPosition.setRecordOffset(offset); + recordPositionList.add(recordPosition); } - return recordPosition; + return recordPositionList; } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index d6e6a7790..d75ceb6b5 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -49,6 +49,10 @@ public class CanalSourceConfig extends SourceConfig { private Long batchTimeout = -1L; + private String tableFilter; + + private String fieldFilter; + private List<RecordPosition> recordPositions; // ================================= channel parameter diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 137e49bdc..a51cb32b9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.job.JobTransportType; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import java.util.List; import java.util.Map; import lombok.Data; @@ -45,7 +46,7 @@ public class FetchJobResponse extends BaseRemoteResponse { private String sinkConnectorDesc; - private RecordPosition position; + private List<RecordPosition> position; private JobState state; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java index e9a7a3828..613623d65 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java @@ -20,6 +20,8 @@ package org.apache.eventmesh.common.remote.response; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import java.util.List; + import lombok.Data; import lombok.EqualsAndHashCode; @@ -27,7 +29,7 @@ import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) public class FetchPositionResponse extends BaseRemoteResponse { - private RecordPosition recordPosition; + private List<RecordPosition> recordPosition; public static FetchPositionResponse successResponse() { FetchPositionResponse response = new FetchPositionResponse(); @@ -36,7 +38,7 @@ public class FetchPositionResponse extends BaseRemoteResponse { return response; } - public static FetchPositionResponse successResponse(RecordPosition recordPosition) { + public static FetchPositionResponse successResponse(List<RecordPosition> recordPosition) { FetchPositionResponse response = successResponse(); response.setRecordPosition(recordPosition); return response; diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle b/eventmesh-connectors/eventmesh-connector-canal/build.gradle index 0d914b7ae..640cb5ce4 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle @@ -26,7 +26,6 @@ dependencies { implementation project(":eventmesh-common") implementation canal implementation "com.alibaba:druid:1.2.6" -// implementation "org.apache.ddlutils:ddlutils:1.0" compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation "org.mockito:mockito-core" diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index c54462374..32c55ec42 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.connector.canal.model.EventType; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,11 +47,12 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class EntryParser { - public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, List<Entry> datas) { + public Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig sourceConfig, List<Entry> datas) { List<CanalConnectRecord> recordList = new ArrayList<>(); List<Entry> transactionDataBuffer = new ArrayList<>(); // need check weather the entry is loopback boolean needSync; + Map<Long, List<CanalConnectRecord>> recordMap = new HashMap<>(); try { for (Entry entry : datas) { switch (entry.getEntryType()) { @@ -63,17 +65,19 @@ public class EntryParser { break; case TRANSACTIONEND: parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); + if (!recordList.isEmpty()) { + recordMap.put(entry.getHeader().getLogfileOffset(), recordList); + } transactionDataBuffer.clear(); break; default: break; } } - parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); } catch (Exception e) { throw new RuntimeException(e); } - return recordList; + return recordMap; } private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List<CanalConnectRecord> recordList, diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index c179124ce..577142e00 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -31,16 +31,18 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.commons.lang3.StringUtils; + import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; - import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; @@ -74,7 +76,9 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour private ClientIdentity clientIdentity; - private String filter = null; + private String tableFilter = null; + + private String fieldFilter = null; private volatile boolean running = false; @@ -95,6 +99,16 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour public void init(ConnectorContext connectorContext) throws Exception { SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig(); + if (sourceConnectorContext.getRecordPositionList() != null) { + this.sourceConfig.setRecordPositions(sourceConnectorContext.getRecordPositionList()); + } + + if (StringUtils.isNotEmpty(sourceConfig.getTableFilter())) { + tableFilter = sourceConfig.getTableFilter(); + } + if (StringUtils.isNotEmpty(sourceConfig.getFieldFilter())) { + fieldFilter = sourceConfig.getFieldFilter(); + } canalServer = CanalServerWithEmbedded.instance(); @@ -103,7 +117,7 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour public CanalInstance generate(String destination) { Canal canal = buildCanal(sourceConfig); - CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) { + CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, tableFilter) { protected CanalHAController initHaController() { return super.initHaController(); @@ -118,6 +132,9 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour ((MysqlEventParser) eventParser).setSupportBinlogImages("FULL"); MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser; mysqlEventParser.setParallel(false); + if (StringUtils.isNotEmpty(fieldFilter)) { + mysqlEventParser.setFieldFilter(fieldFilter); + } CanalHAController haController = mysqlEventParser.getHaController(); if (!haController.isStart()) { @@ -204,7 +221,7 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour canalServer.start(); canalServer.start(sourceConfig.getDestination()); - this.clientIdentity = new ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), filter); + this.clientIdentity = new ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), tableFilter); canalServer.subscribe(clientIdentity); running = true; @@ -274,23 +291,31 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour EntryParser entryParser = new EntryParser(); List<ConnectRecord> result = new ArrayList<>(); - - List<CanalConnectRecord> connectRecordList = entryParser.parse(sourceConfig, entries); - - if (connectRecordList != null && !connectRecordList.isEmpty()) { - CanalConnectRecord lastRecord = connectRecordList.get(connectRecordList.size() - 1); - - CanalRecordPartition canalRecordPartition = new CanalRecordPartition(); - canalRecordPartition.setJournalName(lastRecord.getJournalName()); - canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime()); - - CanalRecordOffset canalRecordOffset = new CanalRecordOffset(); - canalRecordOffset.setOffset(lastRecord.getBinLogOffset()); - - ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis()); - connectRecord.addExtension("messageId", String.valueOf(message.getId())); - connectRecord.setData(connectRecordList); - result.add(connectRecord); + // key: Xid offset + Map<Long, List<CanalConnectRecord>> connectorRecordMap = entryParser.parse(sourceConfig, entries); + + if (!connectorRecordMap.isEmpty()) { + Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = connectorRecordMap.entrySet(); + for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) { + // Xid offset + Long binLogOffset = entry.getKey(); + List<CanalConnectRecord> connectRecordList = entry.getValue(); + CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1); + CanalRecordPartition canalRecordPartition = new CanalRecordPartition(); + canalRecordPartition.setJournalName(lastRecord.getJournalName()); + canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime()); + + CanalRecordOffset canalRecordOffset = new CanalRecordOffset(); + canalRecordOffset.setOffset(binLogOffset); + + ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis()); + connectRecord.addExtension("messageId", String.valueOf(message.getId())); + connectRecord.setData(connectRecordList); + result.add(connectRecord); + } + } else { + // for the message has been filtered need ack message + canalServer.ack(clientIdentity, message.getId()); } return result; diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java index 76800d9c2..55c88ce55 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java @@ -18,8 +18,11 @@ package org.apache.eventmesh.openconnect.api.connector; import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; +import java.util.List; + import lombok.Data; /** @@ -32,4 +35,7 @@ public class SourceConnectorContext implements ConnectorContext { public SourceConfig sourceConfig; + // initial record position + public List<RecordPosition> recordPositionList; + } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index c011a1520..16a6fca3d 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -69,6 +69,8 @@ public class AdminOffsetService implements OffsetManagementService { public KeyValueStore<RecordPartition, RecordOffset> positionStore; + public KeyValueStore<String, RecordPosition> positionStore2; + private String jobId; private JobState jobState; @@ -106,6 +108,7 @@ public class AdminOffsetService implements OffsetManagementService { ReportPositionRequest reportPositionRequest = new ReportPositionRequest(); reportPositionRequest.setJobID(jobId); reportPositionRequest.setState(jobState); + reportPositionRequest.setDataSourceType(dataSourceType); reportPositionRequest.setAddress(IPUtils.getLocalAddress()); reportPositionRequest.setRecordPositionList(recordToSyncList); @@ -119,6 +122,10 @@ public class AdminOffsetService implements OffsetManagementService { .build()) .build(); requestObserver.onNext(payload); + + for (Map.Entry<RecordPartition, RecordOffset> entry : recordMap.entrySet()) { + positionStore.remove(entry.getKey()); + } } @Override @@ -157,8 +164,9 @@ public class AdminOffsetService implements OffsetManagementService { JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), - fetchPositionResponse.getRecordPosition().getRecordOffset()); + for (RecordPosition recordPosition : fetchPositionResponse.getRecordPosition()) { + positionStore.put(recordPosition.getRecordPartition(), recordPosition.getRecordOffset()); + } } } } @@ -175,9 +183,9 @@ public class AdminOffsetService implements OffsetManagementService { fetchPositionRequest.setJobID(jobId); fetchPositionRequest.setAddress(IPUtils.getLocalAddress()); fetchPositionRequest.setDataSourceType(dataSourceType); - RecordPosition recordPosition = new RecordPosition(); - recordPosition.setRecordPartition(partition); - fetchPositionRequest.setRecordPosition(recordPosition); + RecordPosition fetchRecordPosition = new RecordPosition(); + fetchRecordPosition.setRecordPartition(partition); + fetchPositionRequest.setRecordPosition(fetchRecordPosition); Metadata metadata = Metadata.newBuilder() .setType(FetchPositionRequest.class.getSimpleName()) @@ -195,8 +203,9 @@ public class AdminOffsetService implements OffsetManagementService { JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), - fetchPositionResponse.getRecordPosition().getRecordOffset()); + for (RecordPosition recordPosition : fetchPositionResponse.getRecordPosition()) { + positionStore.put(recordPosition.getRecordPartition(), recordPosition.getRecordOffset()); + } } } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 2f16834b4..65676903d 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -53,6 +53,8 @@ import org.apache.eventmesh.runtime.Runtime; import org.apache.eventmesh.runtime.RuntimeInstanceConfig; import org.apache.eventmesh.spi.EventMeshExtensionFactory; +import org.apache.commons.collections4.CollectionUtils; + import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -201,6 +203,9 @@ public class ConnectorRuntime implements Runtime { SourceConnectorContext sourceConnectorContext = new SourceConnectorContext(); sourceConnectorContext.setSourceConfig(sourceConfig); sourceConnectorContext.setOffsetStorageReader(offsetStorageReader); + if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) { + sourceConnectorContext.setRecordPositionList(jobResponse.getPosition()); + } // spi load offsetMgmtService this.offsetManagement = new RecordOffsetManagement(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
