This is an automated email from the ASF dual-hosted git repository. mikexue pushed a commit to branch add-for-test in repository https://gitbox.apache.org/repos/asf/eventmesh.git
commit f3eacde4edfa71af9ad31e1c1207c0b1313dee7a Author: xwm1992 <[email protected]> AuthorDate: Mon Aug 19 23:07:10 2024 +0800 add for test --- eventmesh-admin-server/bin/start-admin.sh | 51 +++++------ eventmesh-admin-server/build.gradle | 2 + eventmesh-admin-server/conf/application.yaml | 8 +- eventmesh-admin-server/conf/eventmesh.sql | 2 +- .../conf/mapper/EventMeshVerifyMapper.xml | 5 +- .../eventmesh/admin/server/web/HttpServer.java | 22 +++++ .../admin/server/web/db/DBThreadPool.java | 26 +++++- .../server/web/db/entity/EventMeshVerify.java | 2 + .../web/handler/impl/FetchJobRequestHandler.java | 2 +- ...fyHandler.java => ReportJobRequestHandler.java} | 37 +++++--- .../web/handler/impl/ReportPositionHandler.java | 1 + .../web/handler/impl/ReportVerifyHandler.java | 50 ++++++++-- .../server/web/service/job/JobInfoBizService.java | 81 ++++++++++++++-- .../service/position/impl/HttpPositionHandler.java | 57 ++++++++++++ .../web/service/verify/VerifyBizService.java | 1 + .../eventmesh/common/config/ConfigService.java | 11 ++- .../config/connector/http}/HttpRetryConfig.java | 2 +- .../config/connector/http}/HttpSinkConfig.java | 2 +- .../config/connector/http}/HttpWebhookConfig.java | 2 +- .../connector/http}/SinkConnectorConfig.java | 11 +-- .../connector/http/SourceConnectorConfig.java | 6 +- .../apache/eventmesh/common/remote/JobState.java | 51 +++++++++++ .../eventmesh/common/remote/TransportType.java | 1 + ...ortVerifyRequest.java => ReportJobRequest.java} | 14 +-- .../common/remote/request/ReportVerifyRequest.java | 2 + .../apache/eventmesh/common/utils/JsonUtils.java | 11 +++ ...apache.eventmesh.common.remote.payload.IPayload | 1 + .../canal/sink/connector/CanalSinkConnector.java | 24 ++++- .../source/connector/CanalSourceConnector.java | 5 +- .../connector/http/sink/HttpSinkConnector.java | 4 +- .../http/sink/data/HttpConnectRecord.java | 40 ++++++-- .../http/sink/handler/AbstractHttpSinkHandler.java | 4 +- .../http/sink/handler/HttpSinkHandler.java | 8 +- .../sink/handler/impl/CommonHttpSinkHandler.java | 64 +++++++------ .../handler/impl/HttpSinkHandlerRetryWrapper.java | 4 +- .../sink/handler/impl/WebhookHttpSinkHandler.java | 12 +-- .../connector/http/source/data/WebhookRequest.java | 3 + .../http/source/protocol/impl/CommonProtocol.java | 31 ++++++- .../http/source/protocol/impl/GitHubProtocol.java | 2 +- ...ventmesh.openconnect.api.ConnectorCreateService | 12 +-- .../connector/http/sink/HttpSinkConnectorTest.java | 4 +- .../api/connector/SinkConnectorContext.java | 4 + .../offsetmgmt/admin/AdminOffsetService.java | 3 + .../offsetmgmt/api/data/DefaultKeyValue.java | 5 + eventmesh-runtime-v2/build.gradle | 1 + .../runtime/connector/ConnectorRuntime.java | 102 ++++++++++++++++----- 46 files changed, 607 insertions(+), 186 deletions(-) diff --git a/eventmesh-admin-server/bin/start-admin.sh b/eventmesh-admin-server/bin/start-admin.sh index 93c364439..163303661 100644 --- a/eventmesh-admin-server/bin/start-admin.sh +++ b/eventmesh-admin-server/bin/start-admin.sh @@ -56,34 +56,34 @@ function extract_java_version { #} function get_pid { - local ppid="" - if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then - ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file) - # If the process does not exist, it indicates that the previous process terminated abnormally. + local ppid="" + if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then + ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file) + # If the process does not exist, it indicates that the previous process terminated abnormally. if [ ! -d /proc/$ppid ]; then # Remove the residual file. rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output." ppid="" fi - else - if [[ $OS =~ Msys ]]; then - # There is a Bug on Msys that may not be able to kill the identified process - ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}` - elif [[ $OS =~ Darwin ]]; then - # Known problem: grep Java may not be able to accurately identify Java processes - ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) - else - if [ $DOCKER ]; then - # No need to exclude root user in Docker containers. - ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'}) - else + else + if [[ $OS =~ Msys ]]; then + # There is a Bug on Msys that may not be able to kill the identified process + ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # Known problem: grep Java may not be able to accurately identify Java processes + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + if [ $DOCKER ]; then + # No need to exclude root user in Docker containers. + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'}) + else # It is required to identify the process as accurately as possible on Linux. ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'}) fi - fi - fi - echo "$ppid"; + fi + fi + echo "$ppid"; } #=========================================================================================== @@ -136,8 +136,7 @@ export JAVA_HOME GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log" -#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4" -JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g" JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" JAVA_OPT="${JAVA_OPT} -verbose:gc" if [[ "$JAVA_VERSION" == "8" ]]; then @@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin" # echo "proxy is running already" # exit 9; # else -# echo "err pid$pid, rm pid.file" +# echo "err pid$pid, rm pid.file" # rm pid.file # fi #fi @@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then exit 9 fi if [ -n "$pid" ]; then - echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again." - exit 9 + echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9 fi make_logs_dir @@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer if [ $DOCKER ]; then - $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out + $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out else - $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 & + $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 & echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file fi exit 0 diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle index 1fec2c7c5..95c8fa137 100644 --- a/eventmesh-admin-server/build.gradle +++ b/eventmesh-admin-server/build.gradle @@ -38,6 +38,8 @@ dependencies { implementation "com.alibaba:druid-spring-boot-starter" compileOnly 'com.mysql:mysql-connector-j' compileOnly 'org.projectlombok:lombok' + testImplementation 'junit:junit:4.12' + testImplementation 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' } diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml index 274196db6..3d702e579 100644 --- a/eventmesh-admin-server/conf/application.yaml +++ b/eventmesh-admin-server/conf/application.yaml @@ -35,8 +35,8 @@ event-mesh: # grpc server port port: 8081 adminServerList: - region1: + R1: - http://localhost:8082 - region2: - - http://localhost:8083 - region: region1 \ No newline at end of file + R2: + - http://localhost:8082 + region: R1 \ No newline at end of file diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 986320570..6e28daca8 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` ( `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), - UNIQUE KEY `runtimeAddr` (`runtimeAddr`), KEY `jobID` (`jobID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; @@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL, `recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL, `recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL, `connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL, diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml index b7b042145..45727498c 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml @@ -26,6 +26,7 @@ <resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify"> <id property="id" column="id" jdbcType="INTEGER"/> <result property="taskID" column="taskID" jdbcType="VARCHAR"/> + <result property="jobID" column="jobID" jdbcType="VARCHAR"/> <result property="recordID" column="recordID" jdbcType="VARCHAR"/> <result property="recordSig" column="recordSig" jdbcType="VARCHAR"/> <result property="connectorName" column="connectorName" jdbcType="VARCHAR"/> @@ -35,8 +36,8 @@ </resultMap> <sql id="Base_Column_List"> - id,taskID,recordID, - recordSig,connectorName,connectorStage, + id,taskID,jobID,recordID, + recordSig,connectorName,connectorStage, position,createTime </sql> </mapper> diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index 12afb3a3d..8350802f7 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -17,8 +17,11 @@ package org.apache.eventmesh.admin.server.web; +import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.admin.server.web.service.task.TaskBizService; +import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.CreateTaskResponse; import org.apache.eventmesh.common.utils.JsonUtils; @@ -31,17 +34,36 @@ import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/eventmesh/admin") +@Slf4j public class HttpServer { @Autowired private TaskBizService taskService; + @Autowired + private VerifyBizService verifyService; + @RequestMapping(value = "/createTask", method = RequestMethod.POST) public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) { + log.info("receive http proto create task:{}",task); CreateTaskResponse createTaskResponse = taskService.createTask(task); + log.info("receive http proto create task result:{}",createTaskResponse); return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse))); } + + @RequestMapping(value = "/reportVerify", method = RequestMethod.POST) + public ResponseEntity<Object> reportVerify(@RequestBody ReportVerifyRequest request) { + log.info("receive http proto report verify request:{}", request); + boolean result = verifyService.reportVerifyRecord(request); + log.info("receive http proto report verify result:{}", result); + if (result) { + return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request)); + } else { + return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request)); + } + } + public boolean deleteTask(Long id) { return false; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java index f1de76496..124eca426 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java @@ -20,6 +20,7 @@ package org.apache.eventmesh.admin.server.web.db; import org.apache.eventmesh.common.EventMeshThreadFactory; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,17 +40,34 @@ public class DBThreadPool { new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"), new ThreadPoolExecutor.DiscardOldestPolicy()); + + private final ScheduledThreadPoolExecutor checkScheduledExecutor = + new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("admin-server-check-scheduled"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + @PreDestroy private void destroy() { if (!executor.isShutdown()) { try { executor.shutdown(); if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { - log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately"); + log.info("wait handler thread pool shutdown timeout, it will shutdown immediately"); executor.shutdownNow(); } } catch (InterruptedException e) { - log.warn("wait heart beat handler thread pool shutdown fail"); + log.warn("wait handler thread pool shutdown fail"); + } + } + + if (!checkScheduledExecutor.isShutdown()) { + try { + checkScheduledExecutor.shutdown(); + if (!checkScheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + log.info("wait scheduled thread pool shutdown timeout, it will shutdown immediately"); + checkScheduledExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("wait scheduled thread pool shutdown fail"); } } } @@ -57,4 +75,8 @@ public class DBThreadPool { public ThreadPoolExecutor getExecutors() { return executor; } + + public ScheduledThreadPoolExecutor getCheckExecutor() { + return checkScheduledExecutor; + } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java index 5425c5c57..9d3e817ff 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java @@ -37,6 +37,8 @@ public class EventMeshVerify implements Serializable { private String taskID; + private String jobID; + private String recordID; private String recordSig; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java index b377bcddd..3392084c2 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java @@ -56,7 +56,7 @@ public class FetchJobRequestHandler extends BaseRequestHandler<FetchJobRequest, config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf())); config.setSourceConnectorDesc(detail.getSourceConnectorDesc()); config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf())); - config.setSourceConnectorDesc(detail.getSinkConnectorDesc()); + config.setSinkConnectorDesc(detail.getSinkConnectorDesc()); response.setConnectorConfig(config); response.setTransportType(detail.getTransportType()); response.setState(detail.getState()); diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java similarity index 54% copy from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java copy to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java index 39963494c..defec3f8e 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java @@ -17,33 +17,40 @@ package org.apache.eventmesh.admin.server.web.handler.impl; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; +import org.apache.eventmesh.common.remote.request.ReportJobRequest; import org.apache.eventmesh.common.remote.response.SimpleResponse; - -import org.apache.commons.lang3.StringUtils; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import lombok.extern.slf4j.Slf4j; - @Component @Slf4j -public class ReportVerifyHandler extends BaseRequestHandler<ReportVerifyRequest, SimpleResponse> { +public class ReportJobRequestHandler extends BaseRequestHandler<ReportJobRequest, SimpleResponse> { + @Autowired - private VerifyBizService verifyService; + JobInfoBizService jobInfoBizService; @Override - protected SimpleResponse handler(ReportVerifyRequest request, Metadata metadata) { - if (StringUtils.isAnyBlank(request.getTaskID(), request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) { - log.info("report verify request [{}] illegal", request); - return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id, sign, record id or stage is none"); + public SimpleResponse handler(ReportJobRequest request, Metadata metadata) { + log.info("receive report job request:{}", request); + if (StringUtils.isBlank(request.getJobID())) { + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + } + EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(request.getJobID()); + if (jobInfo == null) { + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, not exist target job,jobID:" + request.getJobID()); + } + boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), request.getState()); + if (result) { + return SimpleResponse.success(); + } else { + return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job failed."); } - return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify " - + "request fail"); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java index 5e2a96826..78335d419 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java @@ -48,6 +48,7 @@ public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequ @Override protected SimpleResponse handler(ReportPositionRequest request, Metadata metadata) { + log.info("receive report position request:{}", request); if (StringUtils.isBlank(request.getJobID())) { log.info("request [{}] illegal job id", request); return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java index 39963494c..99defbe7c 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java @@ -17,19 +17,23 @@ package org.apache.eventmesh.admin.server.web.handler.impl; +import org.apache.eventmesh.admin.server.AdminServerProperties; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.SimpleResponse; - import org.apache.commons.lang3.StringUtils; - import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; - import lombok.extern.slf4j.Slf4j; +import org.springframework.web.client.RestTemplate; +import java.util.List; +import java.util.Random; @Component @Slf4j @@ -37,13 +41,45 @@ public class ReportVerifyHandler extends BaseRequestHandler<ReportVerifyRequest, @Autowired private VerifyBizService verifyService; + @Autowired + JobInfoBizService jobInfoBizService; + + @Autowired + private AdminServerProperties properties; + @Override protected SimpleResponse handler(ReportVerifyRequest request, Metadata metadata) { - if (StringUtils.isAnyBlank(request.getTaskID(), request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) { + if (StringUtils.isAnyBlank(request.getTaskID(), request.getJobID(), request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) { log.info("report verify request [{}] illegal", request); - return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id, sign, record id or stage is none"); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id,job id, sign, record id or stage is none"); + } + + String jobID = request.getJobID(); + EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(jobID); + if (jobInfo == null || StringUtils.isBlank(jobInfo.getFromRegion())) { + log.info("report verify job info [{}] illegal", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "job info is null or fromRegion is blank,job id:" + jobID); + } + + String fromRegion = jobInfo.getFromRegion(); + String localRegion = properties.getRegion(); + log.info("report verify request from region:{},localRegion:{},request:{}", fromRegion, localRegion, request); + if(fromRegion.equalsIgnoreCase(localRegion)){ + return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify " + + "request fail"); + } else { + log.info("start transfer report verify to from region admin server. from region:{}", fromRegion); + List<String> adminServerList = properties.getAdminServerList().get(fromRegion); + if (adminServerList == null || adminServerList.isEmpty()) { + throw new RuntimeException("No admin server available for region: " + fromRegion); + } + String targetUrl = adminServerList.get(new Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportVerify"; + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class); + if (!response.getStatusCode().is2xxSuccessful()) { + return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify request fail,code:" + response.getStatusCode() + ",msg:" + response.getBody()); + } + return SimpleResponse.success(); } - return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify " - + "request fail"); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java index 0657383e2..70abececb 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -19,15 +19,19 @@ package org.apache.eventmesh.admin.server.web.service.job; import org.apache.eventmesh.admin.server.AdminServerProperties; import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.admin.server.web.db.DBThreadPool; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService; import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.TaskState; import org.apache.eventmesh.common.remote.TransportType; import org.apache.eventmesh.common.remote.datasource.DataSource; @@ -35,20 +39,18 @@ import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq; import org.apache.eventmesh.common.utils.JsonUtils; - import org.apache.commons.lang3.StringUtils; - +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.UUID; - +import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; - import com.baomidou.mybatisplus.core.toolkit.Wrappers; - import lombok.extern.slf4j.Slf4j; +import javax.annotation.PostConstruct; /** * for table 'event_mesh_job_info' db operation @@ -75,13 +77,41 @@ public class JobInfoBizService { @Autowired private AdminServerProperties properties; + @Autowired + EventMeshRuntimeHeartbeatService heartbeatService; + + private final long heatBeatPeriod = Duration.ofMillis(5000).toMillis(); + + @Autowired + DBThreadPool executor; + + @PostConstruct + public void init() { + log.info("init check job info scheduled task."); + executor.getCheckExecutor().scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + checkJobInfo(); + } + }, 10, 10, TimeUnit.SECONDS); + } + public boolean updateJobState(String jobID, TaskState state) { if (jobID == null || state == null) { return false; } EventMeshJobInfo jobInfo = new EventMeshJobInfo(); jobInfo.setJobState(state.name()); - return jobInfoService.update(jobInfo, Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state", TaskState.DELETE.name())); + return jobInfoService.update(jobInfo, Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState", JobState.DELETE.name())); + } + + public boolean updateJobState(String jobID, JobState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setJobState(state.name()); + return jobInfoService.update(jobInfo, Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState", JobState.DELETE.name())); } @Transactional @@ -114,7 +144,8 @@ public class JobInfoBizService { source.setOperator(job.getCreateUid()); source.setRegion(job.getSourceDataSource().getRegion()); source.setDesc(job.getSourceConnectorDesc()); - source.setConfig(job.getSourceDataSource().getConf()); + Config sourceConfig = job.getSourceDataSource().getConf(); + source.setConfig(sourceConfig); source.setConfigClass(job.getSourceDataSource().getConfClazz().getName()); EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source); entity.setSourceData(createdSource.getId()); @@ -124,7 +155,8 @@ public class JobInfoBizService { sink.setOperator(job.getCreateUid()); sink.setRegion(job.getSinkDataSource().getRegion()); sink.setDesc(job.getSinkConnectorDesc()); - sink.setConfig(job.getSinkDataSource().getConf()); + Config sinkConfig = job.getSinkDataSource().getConf(); + sink.setConfig(sinkConfig); sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName()); EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink); entity.setTargetData(createdSink.getId()); @@ -134,7 +166,7 @@ public class JobInfoBizService { int changed = jobInfoExtService.batchSave(entityList); if (changed != jobs.size()) { throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, String.format("create [%d] jobs of not match expect [%d]", - changed, jobs.size())); + changed, jobs.size())); } return entityList; } @@ -168,7 +200,7 @@ public class JobInfoBizService { detail.setSourceConnectorDesc(source.getDescription()); if (source.getDataType() != null) { detail.setPositions(positionBizService.getPositionByJobID(job.getJobID(), - DataSourceType.getDataSourceType(source.getDataType()))); + DataSourceType.getDataSourceType(source.getDataType()))); } } @@ -195,6 +227,35 @@ public class JobInfoBizService { detail.setTransportType(TransportType.getTransportType(job.getTransportType())); return detail; } + + public EventMeshJobInfo getJobInfo(String jobID) { + if (jobID == null) { + return null; + } + EventMeshJobInfo job = jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID)); + return job; + } + + public void checkJobInfo() { + List<EventMeshJobInfo> eventMeshJobInfoList = jobInfoService.list(Wrappers.<EventMeshJobInfo>query().eq("jobState", JobState.RUNNING.name())); + log.info("start check job info.to check job size:{}", eventMeshJobInfoList.size()); + for (EventMeshJobInfo jobInfo : eventMeshJobInfoList) { + String jobID = jobInfo.getJobID(); + if (StringUtils.isEmpty(jobID)) { + continue; + } + EventMeshRuntimeHeartbeat heartbeat = heartbeatService.getOne(Wrappers.<EventMeshRuntimeHeartbeat>query().eq("jobID", jobID)); + if (heartbeat == null) { + continue; + } + // if last heart beat update time have delay three period.print job heart beat delay warn + long currentTimeStamp = System.currentTimeMillis(); + if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 * heatBeatPeriod) { + log.warn("current job heart heart has delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID, currentTimeStamp, heartbeat.getUpdateTime()); + } + } + } + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java new file mode 100644 index 000000000..b0f89ec03 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.position.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import org.apache.eventmesh.admin.server.web.service.position.PositionHandler; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; + +@Component +@Slf4j +public class HttpPositionHandler extends PositionHandler { + @Autowired + EventMeshPositionReporterHistoryService historyService; + + @Override + protected DataSourceType getSourceType() { + return DataSourceType.HTTP; + } + + @Override + public boolean handler(ReportPositionRequest request, Metadata metadata) { + log.info("receive http position report request:{}", request); + // mock wemq postion report store + return true; + } + + @Override + public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) { + // mock http position fetch request + List<RecordPosition> recordPositionList = new ArrayList<>(); + return recordPositionList; + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java index 74f208b19..9d648e0a7 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java @@ -35,6 +35,7 @@ public class VerifyBizService { verify.setRecordSig(request.getRecordSig()); verify.setPosition(request.getPosition()); verify.setTaskID(request.getTaskID()); + verify.setJobID(request.getJobID()); verify.setConnectorName(request.getConnectorName()); verify.setConnectorStage(request.getConnectorStage()); return verifyService.save(verify); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java index 939c9d8d6..3f3f609a1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java @@ -131,7 +131,7 @@ public class ConfigService { } else { filePath = path.startsWith(FILE_PATH_PREFIX) ? path.substring(FILE_PATH_PREFIX.length()) : this.configPath + path; } - + filePath = normalizeFilePath(filePath); if (filePath.contains(".jar")) { try (final InputStream inputStream = getClass().getResourceAsStream(Objects.requireNonNull(resourceUrl))) { if (inputStream == null) { @@ -152,6 +152,15 @@ public class ConfigService { return (T) object; } + private String normalizeFilePath(String filePath) { + if (System.getProperty("os.name").toLowerCase().contains("win")) { + if (filePath.startsWith("/")) { + filePath = filePath.substring(1); + } + } + return filePath; + } + private void populateConfig(Object object, Class<?> clazz, Config config) throws NoSuchFieldException, IOException, IllegalAccessException { ConfigInfo configInfo = new ConfigInfo(); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java similarity index 95% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java index 08c3a323e..319732a87 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.config; +package org.apache.eventmesh.common.config.connector.http; import lombok.Data; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java similarity index 94% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java index 5997b90b7..3c429f335 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.config; +package org.apache.eventmesh.common.config.connector.http; import org.apache.eventmesh.common.config.connector.SinkConfig; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java similarity index 95% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java index f15bac456..96b9e0982 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.config; +package org.apache.eventmesh.common.config.connector.http; import lombok.Data; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java similarity index 84% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java index 9bb338cce..ccebe5a99 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java @@ -15,9 +15,8 @@ * limitations under the License. */ -package org.apache.eventmesh.connector.http.sink.config; +package org.apache.eventmesh.common.config.connector.http; -import io.vertx.core.http.HttpClientOptions; import lombok.Data; @@ -29,19 +28,19 @@ public class SinkConnectorConfig { private String[] urls; // keepAlive, default true - private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE; + private boolean keepAlive = true; // timeunit: ms, default 60000ms - private int keepAliveTimeout = HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent + private int keepAliveTimeout = 60 * 1000; // Keep units consistent // timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms private int connectionTimeout = 5000; // timeunit: ms, default 5000ms - private int idleTimeout; + private int idleTimeout = 5000; // maximum number of HTTP/1 connections a client will pool, default 5 - private int maxConnectionPoolSize = HttpClientOptions.DEFAULT_MAX_POOL_SIZE; + private int maxConnectionPoolSize = 5; // retry config private HttpRetryConfig retryConfig = new HttpRetryConfig(); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java index b7f075e6d..58d910bf2 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java @@ -27,7 +27,7 @@ public class SourceConnectorConfig { private String connectorName; - private String path; + private String path = "/"; private int port; @@ -51,11 +51,11 @@ public class SourceConnectorConfig { private int batchSize = 10; // protocol, default CloudEvent - private String protocol = "CloudEvent"; + private String protocol = "Common"; // extra config, e.g. GitHub secret private Map<String, String> extraConfig = new HashMap<>(); // data consistency enabled, default true - private boolean dataConsistencyEnabled = true; + private boolean dataConsistencyEnabled = false; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java new file mode 100644 index 000000000..53d20f2ac --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote; + +import lombok.ToString; + +import java.util.HashMap; +import java.util.Map; + +@ToString +public enum JobState { + INIT, RUNNING, COMPLETE, DELETE, FAIL; + private static final JobState[] STATES_NUM_INDEX = JobState.values(); + private static final Map<String, JobState> STATES_NAME_INDEX = new HashMap<>(); + static { + for (JobState jobState : STATES_NUM_INDEX) { + STATES_NAME_INDEX.put(jobState.name(), jobState); + } + } + + public static JobState fromIndex(Integer index) { + if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) { + return null; + } + + return STATES_NUM_INDEX[index]; + } + + public static JobState fromIndex(String index) { + if (index == null || index.isEmpty()) { + return null; + } + + return STATES_NAME_INDEX.get(index); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java index 82e7bc021..6b4359839 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java @@ -35,6 +35,7 @@ public enum TransportType { HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS), HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ), REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ), + HTTP_HTTP(DataSourceType.HTTP, DataSourceType.HTTP), ; private static final Map<String, TransportType> INDEX_TYPES = new HashMap<>(); private static final TransportType[] TYPES = TransportType.values(); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java similarity index 80% copy from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java index cd541949f..9e7444459 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java @@ -20,21 +20,17 @@ package org.apache.eventmesh.common.remote.request; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; +import org.apache.eventmesh.common.remote.JobState; @Data @EqualsAndHashCode(callSuper = true) @ToString -public class ReportVerifyRequest extends BaseRemoteRequest { +public class ReportJobRequest extends BaseRemoteRequest { - private String taskID; + private String jobID; - private String recordID; + private JobState state; - private String recordSig; + private String address; - private String connectorName; - - private String connectorStage; - - private String position; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java index cd541949f..bd38881c3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java @@ -28,6 +28,8 @@ public class ReportVerifyRequest extends BaseRemoteRequest { private String taskID; + private String jobID; + private String recordID; private String recordSig; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index 9e9cea304..f2328541c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -58,6 +58,10 @@ public class JsonUtils { return OBJECT_MAPPER.convertValue(fromValue, toValueType); } + public static <T> T convertValue(Object fromValue, TypeReference<T> toValueTypeRef) { + return OBJECT_MAPPER.convertValue(fromValue, toValueTypeRef); + } + public static <T> T mapToObject(Map<String, Object> map, Class<T> beanClass) { if (map == null) { return null; @@ -177,6 +181,13 @@ public class JsonUtils { } } + public static <T> T parseTypeReferenceObject(Object object, TypeReference<T> typeReference) { + if (object == null) { + return null; + } + return convertValue(object, typeReference); + } + public static <T> T parseTypeReferenceObject(byte[] text, TypeReference<T> typeReference) { try { return OBJECT_MAPPER.readValue(text, typeReference); diff --git a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload index 82d5c94dd..433cf57ed 100644 --- a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload +++ b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload @@ -16,6 +16,7 @@ org.apache.eventmesh.common.remote.request.FetchJobRequest org.apache.eventmesh.common.remote.response.FetchJobResponse org.apache.eventmesh.common.remote.request.ReportPositionRequest +org.apache.eventmesh.common.remote.request.ReportJobRequest org.apache.eventmesh.common.remote.request.ReportVerifyRequest org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest org.apache.eventmesh.common.remote.request.FetchPositionRequest diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 49fb10dd3..57c9e5645 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -19,6 +19,7 @@ package org.apache.eventmesh.connector.canal.sink.connector; import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.CanalConnectRecord; import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.connector.canal.SqlUtils; @@ -75,6 +76,7 @@ import org.springframework.transaction.support.TransactionCallback; import org.springframework.util.CollectionUtils; import com.alibaba.otter.canal.common.utils.NamedThreadFactory; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; @@ -165,8 +167,11 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService<Sink> { DbLoadContext context = new DbLoadContext(); for (ConnectRecord connectRecord : sinkRecords) { List<CanalConnectRecord> canalConnectRecordList = new ArrayList<>(); + + List<CanalConnectRecord> canalConnectRecords = convertToCanalConnectRecord(connectRecord); + // deep copy connectRecord data - for (CanalConnectRecord record : (List<CanalConnectRecord>) connectRecord.getData()) { + for (CanalConnectRecord record : canalConnectRecords) { canalConnectRecordList.add(SerializationUtils.clone(record)); } canalConnectRecordList = filterRecord(canalConnectRecordList); @@ -302,7 +307,8 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService<Sink> { private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, ConnectRecord connectRecord) { int batchIndex = connectRecord.getExtension("batchIndex", Integer.class); int totalBatches = connectRecord.getExtension("totalBatches", Integer.class); - List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>) connectRecord.getData(); + List<CanalConnectRecord> canalConnectRecordList = convertToCanalConnectRecord(connectRecord); + String gtid = canalConnectRecordList.get(0).getCurrentGtid(); GtidBatchManager.addBatch(gtid, batchIndex, totalBatches, canalConnectRecordList); // check whether the batch is complete @@ -357,6 +363,20 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService<Sink> { } } + private List<CanalConnectRecord> convertToCanalConnectRecord(ConnectRecord connectRecord) { + List<CanalConnectRecord> canalConnectRecordList; + try { + canalConnectRecordList = + JsonUtils.parseTypeReferenceObject((byte[]) connectRecord.getData(), new TypeReference<List<CanalConnectRecord>>() { + }); + } catch (Exception e) { + log.error("Failed to parse the canalConnectRecords.", e); + connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e)); + throw new RuntimeException("Failed to parse the canalConnectRecords.", e); + } + return canalConnectRecordList; + } + private List<List<CanalConnectRecord>> split(List<CanalConnectRecord> records) { List<List<CanalConnectRecord>> result = new ArrayList<>(); if (records == null || records.isEmpty()) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index ea5ccdeed..49bc05878 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -36,6 +36,7 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.commons.lang3.StringUtils; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -312,7 +313,7 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour entries = new ArrayList<>(message.getRawEntries().size()); for (ByteString entry : message.getRawEntries()) { try { - entries.add(CanalEntry.Entry.parseFrom(entry)); + entries.add(Entry.parseFrom(entry)); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } @@ -356,7 +357,7 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour connectRecord.addExtension("messageId", String.valueOf(message.getId())); connectRecord.addExtension("batchIndex", i); connectRecord.addExtension("totalBatches", splitLists.size()); - connectRecord.setData(splitLists.get(i)); + connectRecord.setData(JsonUtils.toJSONString(splitLists.get(i)).getBytes(StandardCharsets.UTF_8)); result.add(connectRecord); } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java index 9b6038bde..3df110f2e 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java @@ -18,8 +18,8 @@ package org.apache.eventmesh.connector.http.sink; import org.apache.eventmesh.common.config.connector.Config; -import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; -import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig; +import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler; import org.apache.eventmesh.connector.http.sink.handler.impl.CommonHttpSinkHandler; import org.apache.eventmesh.connector.http.sink.handler.impl.HttpSinkHandlerRetryWrapper; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java index 95b40afe9..9c8b1ce67 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java @@ -19,9 +19,11 @@ package org.apache.eventmesh.connector.http.sink.data; import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.KeyValue; import java.io.Serializable; import java.time.LocalDateTime; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -58,10 +60,9 @@ public class HttpConnectRecord implements Serializable { */ private String eventId; - /** - * The ConnectRecord to be sent - */ - private ConnectRecord data; + private Object data; + + private KeyValue extensions; @Override public String toString() { @@ -71,6 +72,7 @@ public class HttpConnectRecord implements Serializable { + ", type='" + type + ", eventId='" + eventId + ", data=" + data + + ", extensions=" + extensions + '}'; } @@ -83,16 +85,34 @@ public class HttpConnectRecord implements Serializable { public static HttpConnectRecord convertConnectRecord(ConnectRecord record, String type) { Map<String, ?> offsetMap = new HashMap<>(); if (record != null && record.getPosition() != null && record.getPosition().getRecordOffset() != null) { - offsetMap = ((HttpRecordOffset) record.getPosition().getRecordOffset()).getOffsetMap(); + if (HttpRecordOffset.class.equals(record.getPosition().getRecordOffsetClazz())) { + offsetMap = ((HttpRecordOffset) record.getPosition().getRecordOffset()).getOffsetMap(); + } } String offset = "0"; if (!offsetMap.isEmpty()) { offset = offsetMap.values().iterator().next().toString(); } - return HttpConnectRecord.builder() - .type(type) - .eventId(type + "-" + offset) - .data(record) - .build(); + if (record.getData() instanceof byte[]) { + String data = Base64.getEncoder().encodeToString((byte[]) record.getData()); + record.addExtension("isBase64", true); + return HttpConnectRecord.builder() + .type(type) + .createTime(LocalDateTime.now()) + .eventId(type + "-" + offset) + .data(data) + .extensions(record.getExtensions()) + .build(); + } else { + record.addExtension("isBase64", false); + return HttpConnectRecord.builder() + .type(type) + .createTime(LocalDateTime.now()) + .eventId(type + "-" + offset) + .data(record.getData()) + .extensions(record.getExtensions()) + .build(); + } } + } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java index 36d01115b..5c868f4aa 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.connector.http.sink.handler; -import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext; @@ -81,7 +81,7 @@ public abstract class AbstractHttpSinkHandler implements HttpSinkHandler { attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent); // deliver the record - deliver(url, httpConnectRecord, attributes); + deliver(url, httpConnectRecord, attributes, record); } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java index 1731809ab..697eb407d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java @@ -33,14 +33,14 @@ import io.vertx.ext.web.client.HttpResponse; * * <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface. * Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)}, - * {@link #deliver(URI, HttpConnectRecord, Map)}, and {@link #stop()} methods.</p> + * {@link #deliver(URI, HttpConnectRecord, Map, ConnectRecord)}, and {@link #stop()} methods.</p> * * <p>Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently. * The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a - * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord, Map)} method processes HttpConnectRecord on specified + * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord, Map, ConnectRecord)} method processes HttpConnectRecord on specified * URL while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.</p> * - * <p>It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord, Map)} method + * <p>It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord, Map, ConnectRecord)} method * to prevent message loss or processing interruptions.</p> */ public interface HttpSinkHandler { @@ -66,7 +66,7 @@ public interface HttpSinkHandler { * @param attributes additional attributes to be used in processing * @return processing chain */ - Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes); + Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord); /** * Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed. diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java index 090784745..d498d449c 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java @@ -17,20 +17,29 @@ package org.apache.eventmesh.connector.http.sink.handler.impl; +import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset; -import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; + +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext; import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler; import org.apache.eventmesh.connector.http.util.HttpUtils; import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import io.netty.handler.codec.http.HttpHeaderNames; @@ -104,22 +113,24 @@ public class CommonHttpSinkHandler extends AbstractHttpSinkHandler { * @return processing chain */ @Override - public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes) { + public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord) { // create headers + Map<String, Object> extensionMap = new HashMap<>(); + Set<String> extensionKeySet = httpConnectRecord.getExtensions().keySet(); + for (String extensionKey : extensionKeySet) { + Object v = httpConnectRecord.getExtensions().getObject(extensionKey); + extensionMap.put(extensionKey, v); + } + MultiMap headers = HttpHeaders.headers() .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") - .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); - + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8") + .set("extension", JsonUtils.toJSONString(extensionMap)); // get timestamp and offset - Long timestamp = httpConnectRecord.getData().getTimestamp(); - Map<String, ?> offset = null; - try { - // May throw NullPointerException. - offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); - } catch (NullPointerException e) { - // ignore null pointer exception - } - final Map<String, ?> finalOffset = offset; + Long timestamp = httpConnectRecord.getCreateTime() + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); // send the request return this.webClient.post(url.getPath()) @@ -127,40 +138,38 @@ public class CommonHttpSinkHandler extends AbstractHttpSinkHandler { .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) .putHeaders(headers) .ssl(Objects.equals(url.getScheme(), "https")) - .sendJson(httpConnectRecord) + .sendJson(httpConnectRecord.getData()) .onSuccess(res -> { - log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset); + log.info("Request sent successfully. Record: timestamp={}", timestamp); Exception e = null; // log the response if (HttpUtils.is2xxSuccessful(res.statusCode())) { if (log.isDebugEnabled()) { - log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", - res.statusCode(), timestamp, finalOffset, res.bodyAsString()); + log.debug("Received successful response: statusCode={}. Record: timestamp={}, responseBody={}", + res.statusCode(), timestamp, res.bodyAsString()); } else { - log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, - finalOffset); + log.info("Received successful response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp); } } else { if (log.isDebugEnabled()) { - log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", - res.statusCode(), timestamp, finalOffset, res.bodyAsString()); + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, responseBody={}", + res.statusCode(), timestamp, res.bodyAsString()); } else { - log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, - finalOffset); + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp); } e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode()); } // try callback - tryCallback(httpConnectRecord, e, attributes); + tryCallback(httpConnectRecord, e, attributes, connectRecord); }).onFailure(err -> { - log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err); + log.error("Request failed to send. Record: timestamp={}", timestamp, err); // try callback - tryCallback(httpConnectRecord, err, attributes); + tryCallback(httpConnectRecord, err, attributes, connectRecord); }); } @@ -171,7 +180,7 @@ public class CommonHttpSinkHandler extends AbstractHttpSinkHandler { * @param e the exception thrown during the request, may be null * @param attributes additional attributes to be used in processing */ - private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<String, Object> attributes) { + private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<String, Object> attributes, ConnectRecord record) { // get the retry event HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e); @@ -180,7 +189,6 @@ public class CommonHttpSinkHandler extends AbstractHttpSinkHandler { if (multiHttpRequestContext.getRemainingRequests() == 0) { // do callback - ConnectRecord record = httpConnectRecord.getData(); if (record.getCallback() == null) { if (log.isDebugEnabled()) { log.warn("ConnectRecord callback is null. Ignoring callback. {}", record); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java index 268d0a0d6..9d7970a84 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java @@ -17,8 +17,8 @@ package org.apache.eventmesh.connector.http.sink.handler.impl; -import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig; -import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig; +import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler; diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java index ff8f69d45..7031438da 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java @@ -19,8 +19,8 @@ package org.apache.eventmesh.connector.http.sink.handler.impl; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; -import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig; -import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig; +import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata; import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; @@ -54,6 +54,7 @@ import com.alibaba.fastjson2.JSONWriter; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; /** * Extends CommonHttpSinkHandler to provide additional functionality for handling webhook features, including sending requests to callback servers, @@ -209,9 +210,9 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler { * @return processing chain */ @Override - public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes) { + public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord) { // send the request - Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, httpConnectRecord, attributes); + Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, httpConnectRecord, attributes, connectRecord); // store the received data return responseFuture.onComplete(arr -> { // get tryEvent from attributes @@ -260,8 +261,7 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler { .code(response != null ? response.statusCode() : -1) .message(msg) .receivedTime(LocalDateTime.now()) - .httpRecordId(httpConnectRecord.getHttpRecordId()) - .recordId(httpConnectRecord.getData().getRecordId()) + .recordId(httpConnectRecord.getHttpRecordId()) .retryNum(retryEvent.getCurrentRetries()) .build(); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java index 2fe7399da..b6432672a 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java @@ -20,6 +20,7 @@ package org.apache.eventmesh.connector.http.source.data; import java.io.Serializable; import java.util.Map; +import io.vertx.ext.web.RoutingContext; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -42,4 +43,6 @@ public class WebhookRequest implements Serializable { private Object payload; + private RoutingContext routingContext; + } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java index 738f04523..e3ceda55e 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java @@ -19,21 +19,23 @@ package org.apache.eventmesh.connector.http.source.protocol.impl; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; import org.apache.eventmesh.connector.http.source.data.CommonResponse; import org.apache.eventmesh.connector.http.source.data.WebhookRequest; import org.apache.eventmesh.connector.http.source.protocol.Protocol; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import java.util.Base64; import java.util.Map; import java.util.stream.Collectors; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.http.HttpMethod; +import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Route; import io.vertx.ext.web.handler.BodyHandler; - import lombok.extern.slf4j.Slf4j; /** @@ -69,12 +71,13 @@ public class CommonProtocol implements Protocol { .handler(BodyHandler.create()) .handler(ctx -> { // Get the payload - String payloadStr = ctx.body().asString(Constants.DEFAULT_CHARSET.toString()); + Object payload = ctx.body().asString(Constants.DEFAULT_CHARSET.toString()); + payload = JsonUtils.parseObject(payload.toString(), String.class); // Create and store the webhook request Map<String, String> headerMap = ctx.request().headers().entries().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payloadStr); + WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payload, ctx); if (!queue.offer(webhookRequest)) { throw new IllegalStateException("Failed to store the request."); } @@ -110,7 +113,27 @@ public class CommonProtocol implements Protocol { ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), request.getPayload()); connectRecord.addExtension("source", request.getProtocolName()); connectRecord.addExtension("url", request.getUrl()); - connectRecord.addExtension("headers", request.getHeaders()); + request.getHeaders().forEach((k, v) -> { + if (k.equalsIgnoreCase("extension")) { + JsonObject extension = new JsonObject(v); + extension.forEach(e -> connectRecord.addExtension(e.getKey(), e.getValue())); + } + }); + // check recordUniqueId + if (!connectRecord.getExtensions().containsKey("recordUniqueId")) { + connectRecord.addExtension("recordUniqueId", connectRecord.getRecordId()); + } + + // check data + if (connectRecord.getExtension("isBase64") != null) { + if (Boolean.parseBoolean(connectRecord.getExtension("isBase64"))) { + byte[] data = Base64.getDecoder().decode(connectRecord.getData().toString()); + connectRecord.setData(data); + } + } + if (request.getRoutingContext() != null) { + connectRecord.addExtension("routingContext", request.getRoutingContext()); + } return connectRecord; } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java index e86efcbf3..fac8c0d80 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java @@ -132,7 +132,7 @@ public class GitHubProtocol implements Protocol { // Create and store the webhook request Map<String, String> headerMap = headers.entries().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payloadStr); + WebhookRequest webhookRequest = new WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payloadStr, ctx); if (!queue.offer(webhookRequest)) { throw new IllegalStateException("Failed to store the request."); diff --git a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService similarity index 63% copy from eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload copy to eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService index 82d5c94dd..d62ff1199 100644 --- a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,11 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# + -org.apache.eventmesh.common.remote.request.FetchJobRequest -org.apache.eventmesh.common.remote.response.FetchJobResponse -org.apache.eventmesh.common.remote.request.ReportPositionRequest -org.apache.eventmesh.common.remote.request.ReportVerifyRequest -org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest -org.apache.eventmesh.common.remote.request.FetchPositionRequest -org.apache.eventmesh.common.remote.response.FetchPositionResponse \ No newline at end of file +HTTP-Source=org.apache.eventmesh.connector.http.source.HttpSourceConnector +HTTP-Sink=org.apache.eventmesh.connector.http.sink.HttpSinkConnector diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java index 7ddba511c..5f65f0749 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java @@ -20,8 +20,8 @@ package org.apache.eventmesh.connector.http.sink; import static org.mockserver.model.HttpRequest.request; -import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; -import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig; +import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig; +import org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.ConfigUtil; diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java index cf1b85347..d84dcfbb9 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java @@ -21,6 +21,8 @@ import org.apache.eventmesh.common.config.connector.SinkConfig; import lombok.Data; +import java.util.Map; + /** * Sink Connector Context */ @@ -29,4 +31,6 @@ public class SinkConnectorContext implements ConnectorContext { public SinkConfig sinkConfig; + public Map<String, Object> runtimeConfig; + } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index 977661b13..993352a97 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -112,6 +112,8 @@ public class AdminOffsetService implements OffsetManagementService { reportPositionRequest.setRecordPositionList(recordToSyncList); + log.debug("start report position request: {}", JsonUtils.toJSONString(reportPositionRequest)); + Metadata metadata = Metadata.newBuilder() .setType(ReportPositionRequest.class.getSimpleName()) .build(); @@ -121,6 +123,7 @@ public class AdminOffsetService implements OffsetManagementService { .build()) .build(); requestObserver.onNext(payload); + log.debug("end report position request: {}", JsonUtils.toJSONString(reportPositionRequest)); for (Map.Entry<RecordPartition, RecordOffset> entry : recordMap.entrySet()) { positionStore.remove(entry.getKey()); diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java index a0390c189..891df482b 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java @@ -23,6 +23,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter public class DefaultKeyValue implements KeyValue { private final Map<String, Object> properties; diff --git a/eventmesh-runtime-v2/build.gradle b/eventmesh-runtime-v2/build.gradle index ecba7bffb..04b460ade 100644 --- a/eventmesh-runtime-v2/build.gradle +++ b/eventmesh-runtime-v2/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation project(":eventmesh-openconnect:eventmesh-openconnect-java") implementation project(":eventmesh-common") implementation project(":eventmesh-connectors:eventmesh-connector-canal") + implementation project(":eventmesh-connectors:eventmesh-connector-http") implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-meta:eventmesh-meta-nacos") implementation project(":eventmesh-registry:eventmesh-registry-api") diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 501f222fd..0f7d4e4ed 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -31,8 +31,10 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.Ad import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.request.FetchJobRequest; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.remote.request.ReportJobRequest; import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.utils.IPUtils; @@ -129,10 +131,14 @@ public class ConnectorRuntime implements Runtime { private final ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(); + private final ExecutorService reportVerifyExecutor = Executors.newSingleThreadExecutor(); + private final BlockingQueue<ConnectRecord> queue; private volatile boolean isRunning = false; + private volatile boolean isFailed = false; + public static final String CALLBACK_EXTENSION = "callBackExtension"; private String adminServerAddr; @@ -207,6 +213,8 @@ public class ConnectorRuntime implements Runtime { FetchJobResponse jobResponse = fetchJobConfig(); if (jobResponse == null) { + isFailed = true; + stop(); throw new RuntimeException("fetch job config fail"); } @@ -258,8 +266,11 @@ public class ConnectorRuntime implements Runtime { SinkConfig sinkConfig = (SinkConfig) ConfigUtil.parse(connectorRuntimeConfig.getSinkConnectorConfig(), sinkConnector.configClass()); SinkConnectorContext sinkConnectorContext = new SinkConnectorContext(); sinkConnectorContext.setSinkConfig(sinkConfig); + sinkConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig()); sinkConnector.init(sinkConnectorContext); + reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.INIT); + } private FetchJobResponse fetchJobConfig() { @@ -306,6 +317,7 @@ public class ConnectorRuntime implements Runtime { try { startSinkConnector(); } catch (Exception e) { + isFailed = true; log.error("sink connector [{}] start fail", sinkConnector.name(), e); try { this.stop(); @@ -320,6 +332,7 @@ public class ConnectorRuntime implements Runtime { try { startSourceConnector(); } catch (Exception e) { + isFailed = true; log.error("source connector [{}] start fail", sourceConnector.name(), e); try { this.stop(); @@ -329,15 +342,25 @@ public class ConnectorRuntime implements Runtime { throw new RuntimeException(e); } }); + + reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.RUNNING); } @Override public void stop() throws Exception { + log.info("ConnectorRuntime start stop"); + isRunning = false; + if (isFailed) { + reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL); + } else { + reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.COMPLETE); + } sourceConnector.stop(); sinkConnector.stop(); sourceService.shutdown(); sinkService.shutdown(); heartBeatExecutor.shutdown(); + reportVerifyExecutor.shutdown(); requestObserver.onCompleted(); if (channel != null && !channel.isShutdown()) { channel.shutdown(); @@ -351,6 +374,10 @@ public class ConnectorRuntime implements Runtime { // TODO: use producer pub record to storage replace below if (connectorRecordList != null && !connectorRecordList.isEmpty()) { for (ConnectRecord record : connectorRecordList) { + // check recordUniqueId + if (record.getExtensions() == null || !record.getExtensions().containsKey("recordUniqueId")) { + record.addExtension("recordUniqueId", record.getRecordId()); + } queue.put(record); @@ -364,10 +391,18 @@ public class ConnectorRuntime implements Runtime { record.setCallback(new SendMessageCallback() { @Override public void onSuccess(SendResult result) { + log.debug("send record to sink callback success, record: {}", record); // commit record sourceConnector.commit(record); - Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToUpdateRecordOffset(record); - submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); + if (record.getPosition() != null) { + Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToUpdateRecordOffset(record); + submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); + log.debug("start wait all messages to commit"); + offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS); + // update & commit offset + updateCommittableOffsets(); + commitOffsets(); + } Optional<SendMessageCallback> callback = Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v); callback.ifPresent(cb -> cb.onSuccess(convertToSendResult(record))); @@ -375,6 +410,7 @@ public class ConnectorRuntime implements Runtime { @Override public void onException(SendExceptionContext sendExceptionContext) { + isFailed = true; // handle exception sourceConnector.onException(record); log.error("send record to sink callback exception, process will shut down, record: {}", record, @@ -386,11 +422,6 @@ public class ConnectorRuntime implements Runtime { } } }); - - offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS); - // update & commit offset - updateCommittableOffsets(); - commitOffsets(); } } } @@ -406,24 +437,47 @@ public class ConnectorRuntime implements Runtime { } private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) { - String md5Str = md5(record.toString()); - ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); - reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); - reportVerifyRequest.setRecordID(record.getRecordId()); - reportVerifyRequest.setRecordSig(md5Str); - reportVerifyRequest.setConnectorName( - IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); - reportVerifyRequest.setConnectorStage(connectorStage.name()); - reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition())); - - Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build(); - - Payload request = Payload.newBuilder().setMetadata(metadata) - .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest)))) - .build()) - .build(); + reportVerifyExecutor.submit(() -> { + try { + // use record data + recordUniqueId for md5 + String md5Str = md5(record.getData().toString() + record.getExtension("recordUniqueId")); + ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); + reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); + reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID()); + reportVerifyRequest.setRecordID(record.getRecordId()); + reportVerifyRequest.setRecordSig(md5Str); + reportVerifyRequest.setConnectorName( + IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); + reportVerifyRequest.setConnectorStage(connectorStage.name()); + reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition())); + + Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest)))) + .build()) + .build(); + + requestObserver.onNext(request); + } catch (Exception e) { + log.error("Failed to report verify request", e); + } + }); + } - requestObserver.onNext(request); + private void reportJobRequest(String jobId, JobState jobState) throws InterruptedException { + ReportJobRequest reportJobRequest = new ReportJobRequest(); + reportJobRequest.setJobID(jobId); + reportJobRequest.setState(jobState); + Metadata metadata = Metadata.newBuilder() + .setType(ReportJobRequest.class.getSimpleName()) + .build(); + Payload payload = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest)))) + .build()) + .build(); + requestObserver.onNext(payload); } private String md5(String input) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
