This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 9701f0266 [ISSUE #5079] Enhancement update for admin-server (#5080)
9701f0266 is described below
commit 9701f02660ba04ff37bd0b5787b614a3c91d8bfd
Author: mike_xwm <[email protected]>
AuthorDate: Wed Aug 21 11:26:23 2024 +0800
[ISSUE #5079] Enhancement update for admin-server (#5080)
* [ISSUE #5079] Enhancement update for admin-server
* fix check style error
* fix check style error
---
eventmesh-admin-server/bin/start-admin.sh | 51 ++++++++-------
eventmesh-admin-server/build.gradle | 2 +
eventmesh-admin-server/conf/application.yaml | 8 +--
eventmesh-admin-server/conf/eventmesh.sql | 2 +-
.../conf/mapper/EventMeshVerifyMapper.xml | 5 +-
.../eventmesh/admin/server/web/HttpServer.java | 23 +++++++
.../admin/server/web/db/DBThreadPool.java | 26 +++++++-
.../server/web/db/entity/EventMeshVerify.java | 3 +
.../web/handler/impl/FetchJobRequestHandler.java | 2 +-
...fyHandler.java => ReportJobRequestHandler.java} | 30 ++++++---
.../web/handler/impl/ReportPositionHandler.java | 2 +
.../web/handler/impl/ReportVerifyHandler.java | 49 +++++++++++++-
.../server/web/service/job/JobInfoBizService.java | 74 +++++++++++++++++++++-
.../service/position/impl/HttpPositionHandler.java | 61 ++++++++++++++++++
.../web/service/verify/VerifyBizService.java | 2 +
.../apache/eventmesh/common/remote/JobState.java | 52 +++++++++++++++
.../eventmesh/common/remote/TransportType.java | 1 +
...ortVerifyRequest.java => ReportJobRequest.java} | 15 ++---
.../common/remote/request/ReportVerifyRequest.java | 2 +
...apache.eventmesh.common.remote.payload.IPayload | 1 +
.../offsetmgmt/admin/AdminOffsetService.java | 3 +
21 files changed, 353 insertions(+), 61 deletions(-)
diff --git a/eventmesh-admin-server/bin/start-admin.sh
b/eventmesh-admin-server/bin/start-admin.sh
index 93c364439..163303661 100644
--- a/eventmesh-admin-server/bin/start-admin.sh
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -56,34 +56,34 @@ function extract_java_version {
#}
function get_pid {
- local ppid=""
- if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
- ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
- # If the process does not exist, it indicates that the previous
process terminated abnormally.
+ local ppid=""
+ if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+ ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+ # If the process does not exist, it indicates that the
previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly
before, please check log output."
ppid=""
fi
- else
- if [[ $OS =~ Msys ]]; then
- # There is a Bug on Msys that may not be able to kill
the identified process
- ppid=`jps -v | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v
grep | awk -F ' ' {'print $1'}`
- elif [[ $OS =~ Darwin ]]; then
- # Known problem: grep Java may not be able to
accurately identify Java processes
- ppid=$(/bin/ps -o user,pid,command | grep "java" | grep
-i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root"
|awk -F ' ' {'print $2'})
- else
- if [ $DOCKER ]; then
- # No need to exclude root user in Docker containers.
- ppid=$(ps -C java -o user,pid,command --cols 99999 | grep
-w $EVENTMESH_ADMIN_HOME | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print
$2'})
- else
+ else
+ if [[ $OS =~ Msys ]]; then
+ # There is a Bug on Msys that may not be able to kill
the identified process
+ ppid=`jps -v | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v
grep | awk -F ' ' {'print $1'}`
+ elif [[ $OS =~ Darwin ]]; then
+ # Known problem: grep Java may not be able to
accurately identify Java processes
+ ppid=$(/bin/ps -o user,pid,command | grep "java" |
grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev
"^root" |awk -F ' ' {'print $2'})
+ else
+ if [ $DOCKER ]; then
+ # No need to exclude root user in Docker containers.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep
-w $EVENTMESH_ADMIN_HOME | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print
$2'})
+ else
# It is required to identify the process as accurately as possible on
Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w
$EVENTMESH_ADMIN_HOME | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk
-F ' ' {'print $2'})
fi
- fi
- fi
- echo "$ppid";
+ fi
+ fi
+ echo "$ppid";
}
#===========================================================================================
@@ -136,8 +136,7 @@ export JAVA_HOME
GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
-#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m
-XX:SurvivorRatio=4"
-JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep
APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
@@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT}
-DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
# echo "proxy is running already"
# exit 9;
# else
-# echo "err pid$pid, rm pid.file"
+# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi
@@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
exit 9
fi
if [ -n "$pid" ]; then
- echo -e "ERROR\t The server is already running (pid=$pid), there is no
need to execute start.sh again."
- exit 9
+ echo -e "ERROR\t The server is already running (pid=$pid), there is no
need to execute start.sh again."
+ exit 9
fi
make_logs_dir
@@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >>
${EVENTMESH_ADMIN_LOG_H
EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
- $JAVA $JAVA_OPT -classpath
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
$EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+ $JAVA $JAVA_OPT -classpath
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
$EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
- $JAVA $JAVA_OPT -classpath
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
$EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+ $JAVA $JAVA_OPT -classpath
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
$EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
diff --git a/eventmesh-admin-server/build.gradle
b/eventmesh-admin-server/build.gradle
index 1fec2c7c5..95c8fa137 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -38,6 +38,8 @@ dependencies {
implementation "com.alibaba:druid-spring-boot-starter"
compileOnly 'com.mysql:mysql-connector-j'
compileOnly 'org.projectlombok:lombok'
+ testImplementation 'junit:junit:4.12'
+ testImplementation 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
diff --git a/eventmesh-admin-server/conf/application.yaml
b/eventmesh-admin-server/conf/application.yaml
index 274196db6..3d702e579 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -35,8 +35,8 @@ event-mesh:
# grpc server port
port: 8081
adminServerList:
- region1:
+ R1:
- http://localhost:8082
- region2:
- - http://localhost:8083
- region: region1
\ No newline at end of file
+ R2:
+ - http://localhost:8082
+ region: R1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql
b/eventmesh-admin-server/conf/eventmesh.sql
index 986320570..6e28daca8 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
- UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
@@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
index b7b042145..45727498c 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
@@ -26,6 +26,7 @@
<resultMap id="BaseResultMap"
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
+ <result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="recordID" column="recordID" jdbcType="VARCHAR"/>
<result property="recordSig" column="recordSig"
jdbcType="VARCHAR"/>
<result property="connectorName" column="connectorName"
jdbcType="VARCHAR"/>
@@ -35,8 +36,8 @@
</resultMap>
<sql id="Base_Column_List">
- id,taskID,recordID,
- recordSig,connectorName,connectorStage,
+ id,taskID,jobID,recordID,
+ recordSig,connectorName,connectorStage,
position,createTime
</sql>
</mapper>
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index 12afb3a3d..2454e9f02 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -18,7 +18,9 @@
package org.apache.eventmesh.admin.server.web;
import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
+import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;
@@ -29,19 +31,40 @@ import
org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import lombok.extern.slf4j.Slf4j;
+
@RestController
@RequestMapping("/eventmesh/admin")
+@Slf4j
public class HttpServer {
@Autowired
private TaskBizService taskService;
+ @Autowired
+ private VerifyBizService verifyService;
+
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody
CreateTaskRequest task) {
+ log.info("receive http proto create task:{}", task);
CreateTaskResponse createTaskResponse = taskService.createTask(task);
+ log.info("receive http proto create task result:{}",
createTaskResponse);
return
ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
}
+
+ @RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
+ public ResponseEntity<Object> reportVerify(@RequestBody
ReportVerifyRequest request) {
+ log.info("receive http proto report verify request:{}", request);
+ boolean result = verifyService.reportVerifyRecord(request);
+ log.info("receive http proto report verify result:{}", result);
+ if (result) {
+ return ResponseEntity.ok("report verify success.request:" +
JsonUtils.toJSONString(request));
+ } else {
+ return ResponseEntity.internalServerError().body("report verify
success.request:" + JsonUtils.toJSONString(request));
+ }
+ }
+
public boolean deleteTask(Long id) {
return false;
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
index f1de76496..277ea6665 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.admin.server.web.db;
import org.apache.eventmesh.common.EventMeshThreadFactory;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -39,17 +40,34 @@ public class DBThreadPool {
new LinkedBlockingQueue<>(1000), new
EventMeshThreadFactory("admin-server-db"),
new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ private final ScheduledThreadPoolExecutor checkScheduledExecutor =
+ new
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new
EventMeshThreadFactory("admin-server-check-scheduled"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+
@PreDestroy
private void destroy() {
if (!executor.isShutdown()) {
try {
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
- log.info("wait heart beat handler thread pool shutdown
timeout, it will shutdown immediately");
+ log.info("wait handler thread pool shutdown timeout, it
will shutdown immediately");
executor.shutdownNow();
}
} catch (InterruptedException e) {
- log.warn("wait heart beat handler thread pool shutdown fail");
+ log.warn("wait handler thread pool shutdown fail");
+ }
+ }
+
+ if (!checkScheduledExecutor.isShutdown()) {
+ try {
+ checkScheduledExecutor.shutdown();
+ if (!checkScheduledExecutor.awaitTermination(30,
TimeUnit.SECONDS)) {
+ log.info("wait scheduled thread pool shutdown timeout, it
will shutdown immediately");
+ checkScheduledExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ log.warn("wait scheduled thread pool shutdown fail");
}
}
}
@@ -57,4 +75,8 @@ public class DBThreadPool {
public ThreadPoolExecutor getExecutors() {
return executor;
}
+
+ public ScheduledThreadPoolExecutor getCheckExecutor() {
+ return checkScheduledExecutor;
+ }
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
index 5425c5c57..c5a6c35f8 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
@@ -32,11 +32,14 @@ import lombok.Data;
@TableName(value = "event_mesh_verify")
@Data
public class EventMeshVerify implements Serializable {
+
@TableId(type = IdType.AUTO)
private Integer id;
private String taskID;
+ private String jobID;
+
private String recordID;
private String recordSig;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index b377bcddd..3392084c2 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -56,7 +56,7 @@ public class FetchJobRequestHandler extends
BaseRequestHandler<FetchJobRequest,
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
- config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
+ config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
response.setState(detail.getState());
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
similarity index 54%
copy from
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
copy to
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
index 39963494c..ea836ce7a 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
@@ -17,11 +17,12 @@
package org.apache.eventmesh.admin.server.web.handler.impl;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
-import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
-import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
import org.apache.eventmesh.common.remote.response.SimpleResponse;
import org.apache.commons.lang3.StringUtils;
@@ -33,17 +34,26 @@ import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
-public class ReportVerifyHandler extends
BaseRequestHandler<ReportVerifyRequest, SimpleResponse> {
+public class ReportJobRequestHandler extends
BaseRequestHandler<ReportJobRequest, SimpleResponse> {
+
@Autowired
- private VerifyBizService verifyService;
+ JobInfoBizService jobInfoBizService;
@Override
- protected SimpleResponse handler(ReportVerifyRequest request, Metadata
metadata) {
- if (StringUtils.isAnyBlank(request.getTaskID(),
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
- log.info("report verify request [{}] illegal", request);
- return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task
id, sign, record id or stage is none");
+ public SimpleResponse handler(ReportJobRequest request, Metadata metadata)
{
+ log.info("receive report job request:{}", request);
+ if (StringUtils.isBlank(request.getJobID())) {
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id,
it's empty");
+ }
+ EventMeshJobInfo jobInfo =
jobInfoBizService.getJobInfo(request.getJobID());
+ if (jobInfo == null) {
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id,
not exist target job,jobID:" + request.getJobID());
+ }
+ boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(),
request.getState());
+ if (result) {
+ return SimpleResponse.success();
+ } else {
+ return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job
failed.");
}
- return verifyService.reportVerifyRecord(request) ?
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save
verify "
- + "request fail");
}
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
index 5e2a96826..7a30bef80 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class ReportPositionHandler extends
BaseRequestHandler<ReportPositionRequest, SimpleResponse> {
+
@Autowired
private JobInfoBizService jobInfoBizService;
@@ -48,6 +49,7 @@ public class ReportPositionHandler extends
BaseRequestHandler<ReportPositionRequ
@Override
protected SimpleResponse handler(ReportPositionRequest request, Metadata
metadata) {
+ log.info("receive report position request:{}", request);
if (StringUtils.isBlank(request.getJobID())) {
log.info("request [{}] illegal job id", request);
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id,
it's empty");
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
index 39963494c..9844f47c6 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
@@ -17,7 +17,10 @@
package org.apache.eventmesh.admin.server.web.handler.impl;
+import org.apache.eventmesh.admin.server.AdminServerProperties;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
@@ -26,24 +29,64 @@ import
org.apache.eventmesh.common.remote.response.SimpleResponse;
import org.apache.commons.lang3.StringUtils;
+import java.util.List;
+import java.util.Random;
+
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class ReportVerifyHandler extends
BaseRequestHandler<ReportVerifyRequest, SimpleResponse> {
+
@Autowired
private VerifyBizService verifyService;
+ @Autowired
+ JobInfoBizService jobInfoBizService;
+
+ @Autowired
+ private AdminServerProperties properties;
+
@Override
protected SimpleResponse handler(ReportVerifyRequest request, Metadata
metadata) {
- if (StringUtils.isAnyBlank(request.getTaskID(),
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
+ if (StringUtils.isAnyBlank(request.getTaskID(), request.getJobID(),
request.getRecordSig(), request.getRecordID(),
+ request.getConnectorStage())) {
log.info("report verify request [{}] illegal", request);
- return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task
id, sign, record id or stage is none");
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task
id,job id, sign, record id or stage is none");
+ }
+
+ String jobID = request.getJobID();
+ EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(jobID);
+ if (jobInfo == null || StringUtils.isBlank(jobInfo.getFromRegion())) {
+ log.info("report verify job info [{}] illegal", request);
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "job info is
null or fromRegion is blank,job id:" + jobID);
}
- return verifyService.reportVerifyRecord(request) ?
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save
verify "
+
+ String fromRegion = jobInfo.getFromRegion();
+ String localRegion = properties.getRegion();
+ log.info("report verify request from
region:{},localRegion:{},request:{}", fromRegion, localRegion, request);
+ if (fromRegion.equalsIgnoreCase(localRegion)) {
+ return verifyService.reportVerifyRecord(request) ?
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save
verify "
+ "request fail");
+ } else {
+ log.info("start transfer report verify to from region admin
server. from region:{}", fromRegion);
+ List<String> adminServerList =
properties.getAdminServerList().get(fromRegion);
+ if (adminServerList == null || adminServerList.isEmpty()) {
+ throw new RuntimeException("No admin server available for
region: " + fromRegion);
+ }
+ String targetUrl = adminServerList.get(new
Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportVerify";
+ RestTemplate restTemplate = new RestTemplate();
+ ResponseEntity<String> response =
restTemplate.postForEntity(targetUrl, request, String.class);
+ if (!response.getStatusCode().is2xxSuccessful()) {
+ return SimpleResponse.fail(ErrorCode.INTERNAL_ERR,
+ "save verify request fail,code:" +
response.getStatusCode() + ",msg:" + response.getBody());
+ }
+ return SimpleResponse.success();
+ }
}
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 0657383e2..a8b469d8b 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -19,15 +19,19 @@ package org.apache.eventmesh.admin.server.web.service.job;
import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import org.apache.eventmesh.admin.server.web.db.DBThreadPool;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import
org.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
import
org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
import
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService;
import
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
+import
org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import
org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
import
org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -38,9 +42,13 @@ import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
+import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -75,13 +83,41 @@ public class JobInfoBizService {
@Autowired
private AdminServerProperties properties;
+ @Autowired
+ EventMeshRuntimeHeartbeatService heartbeatService;
+
+ private final long heatBeatPeriod = Duration.ofMillis(5000).toMillis();
+
+ @Autowired
+ DBThreadPool executor;
+
+ @PostConstruct
+ public void init() {
+ log.info("init check job info scheduled task.");
+ executor.getCheckExecutor().scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ checkJobInfo();
+ }
+ }, 10, 10, TimeUnit.SECONDS);
+ }
+
public boolean updateJobState(String jobID, TaskState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobState(state.name());
- return jobInfoService.update(jobInfo,
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state",
TaskState.DELETE.name()));
+ return jobInfoService.update(jobInfo,
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState",
JobState.DELETE.name()));
+ }
+
+ public boolean updateJobState(String jobID, JobState state) {
+ if (jobID == null || state == null) {
+ return false;
+ }
+ EventMeshJobInfo jobInfo = new EventMeshJobInfo();
+ jobInfo.setJobState(state.name());
+ return jobInfoService.update(jobInfo,
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState",
JobState.DELETE.name()));
}
@Transactional
@@ -114,7 +150,8 @@ public class JobInfoBizService {
source.setOperator(job.getCreateUid());
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
- source.setConfig(job.getSourceDataSource().getConf());
+ Config sourceConfig = job.getSourceDataSource().getConf();
+ source.setConfig(sourceConfig);
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
EventMeshDataSource createdSource =
dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());
@@ -124,7 +161,8 @@ public class JobInfoBizService {
sink.setOperator(job.getCreateUid());
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
- sink.setConfig(job.getSinkDataSource().getConf());
+ Config sinkConfig = job.getSinkDataSource().getConf();
+ sink.setConfig(sinkConfig);
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
EventMeshDataSource createdSink =
dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());
@@ -195,6 +233,36 @@ public class JobInfoBizService {
detail.setTransportType(TransportType.getTransportType(job.getTransportType()));
return detail;
}
+
+ public EventMeshJobInfo getJobInfo(String jobID) {
+ if (jobID == null) {
+ return null;
+ }
+ EventMeshJobInfo job =
jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
+ return job;
+ }
+
+ public void checkJobInfo() {
+ List<EventMeshJobInfo> eventMeshJobInfoList =
jobInfoService.list(Wrappers.<EventMeshJobInfo>query().eq("jobState",
JobState.RUNNING.name()));
+ log.info("start check job info.to check job size:{}",
eventMeshJobInfoList.size());
+ for (EventMeshJobInfo jobInfo : eventMeshJobInfoList) {
+ String jobID = jobInfo.getJobID();
+ if (StringUtils.isEmpty(jobID)) {
+ continue;
+ }
+ EventMeshRuntimeHeartbeat heartbeat =
heartbeatService.getOne(Wrappers.<EventMeshRuntimeHeartbeat>query().eq("jobID",
jobID));
+ if (heartbeat == null) {
+ continue;
+ }
+ // if last heart beat update time have delay three period.print
job heart beat delay warn
+ long currentTimeStamp = System.currentTimeMillis();
+ if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 *
heatBeatPeriod) {
+ log.warn("current job heart heart has
delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID,
currentTimeStamp,
+ heartbeat.getUpdateTime());
+ }
+ }
+ }
+
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
new file mode 100644
index 000000000..b8d536f38
--- /dev/null
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.admin.server.web.service.position.impl;
+
+import
org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
+import org.apache.eventmesh.admin.server.web.service.position.PositionHandler;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+public class HttpPositionHandler extends PositionHandler {
+
+ @Autowired
+ EventMeshPositionReporterHistoryService historyService;
+
+ @Override
+ protected DataSourceType getSourceType() {
+ return DataSourceType.HTTP;
+ }
+
+ @Override
+ public boolean handler(ReportPositionRequest request, Metadata metadata) {
+ log.info("receive http position report request:{}", request);
+ // mock wemq postion report store
+ return true;
+ }
+
+ @Override
+ public List<RecordPosition> handler(FetchPositionRequest request, Metadata
metadata) {
+ // mock http position fetch request
+ List<RecordPosition> recordPositionList = new ArrayList<>();
+ return recordPositionList;
+ }
+}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
index 74f208b19..e4f08b30c 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
@@ -26,6 +26,7 @@ import org.springframework.stereotype.Service;
@Service
public class VerifyBizService {
+
@Autowired
private EventMeshVerifyService verifyService;
@@ -35,6 +36,7 @@ public class VerifyBizService {
verify.setRecordSig(request.getRecordSig());
verify.setPosition(request.getPosition());
verify.setTaskID(request.getTaskID());
+ verify.setJobID(request.getJobID());
verify.setConnectorName(request.getConnectorName());
verify.setConnectorStage(request.getConnectorStage());
return verifyService.save(verify);
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
new file mode 100644
index 000000000..da9daffe9
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.ToString;
+
+@ToString
+public enum JobState {
+ INIT, RUNNING, COMPLETE, DELETE, FAIL;
+ private static final JobState[] STATES_NUM_INDEX = JobState.values();
+ private static final Map<String, JobState> STATES_NAME_INDEX = new
HashMap<>();
+
+ static {
+ for (JobState jobState : STATES_NUM_INDEX) {
+ STATES_NAME_INDEX.put(jobState.name(), jobState);
+ }
+ }
+
+ public static JobState fromIndex(Integer index) {
+ if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) {
+ return null;
+ }
+
+ return STATES_NUM_INDEX[index];
+ }
+
+ public static JobState fromIndex(String index) {
+ if (index == null || index.isEmpty()) {
+ return null;
+ }
+
+ return STATES_NAME_INDEX.get(index);
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
index 82e7bc021..6b4359839 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
@@ -35,6 +35,7 @@ public enum TransportType {
HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS),
HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ),
REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ),
+ HTTP_HTTP(DataSourceType.HTTP, DataSourceType.HTTP),
;
private static final Map<String, TransportType> INDEX_TYPES = new
HashMap<>();
private static final TransportType[] TYPES = TransportType.values();
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
similarity index 80%
copy from
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
copy to
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
index cd541949f..aec33e461 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.common.remote.request;
+import org.apache.eventmesh.common.remote.JobState;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -24,17 +26,12 @@ import lombok.ToString;
@Data
@EqualsAndHashCode(callSuper = true)
@ToString
-public class ReportVerifyRequest extends BaseRemoteRequest {
-
- private String taskID;
-
- private String recordID;
+public class ReportJobRequest extends BaseRemoteRequest {
- private String recordSig;
+ private String jobID;
- private String connectorName;
+ private JobState state;
- private String connectorStage;
+ private String address;
- private String position;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
index cd541949f..bd38881c3 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
@@ -28,6 +28,8 @@ public class ReportVerifyRequest extends BaseRemoteRequest {
private String taskID;
+ private String jobID;
+
private String recordID;
private String recordSig;
diff --git
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 82d5c94dd..433cf57ed 100644
---
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
org.apache.eventmesh.common.remote.request.FetchJobRequest
org.apache.eventmesh.common.remote.response.FetchJobResponse
org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportJobRequest
org.apache.eventmesh.common.remote.request.ReportVerifyRequest
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
org.apache.eventmesh.common.remote.request.FetchPositionRequest
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 977661b13..993352a97 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -112,6 +112,8 @@ public class AdminOffsetService implements
OffsetManagementService {
reportPositionRequest.setRecordPositionList(recordToSyncList);
+ log.debug("start report position request: {}",
JsonUtils.toJSONString(reportPositionRequest));
+
Metadata metadata = Metadata.newBuilder()
.setType(ReportPositionRequest.class.getSimpleName())
.build();
@@ -121,6 +123,7 @@ public class AdminOffsetService implements
OffsetManagementService {
.build())
.build();
requestObserver.onNext(payload);
+ log.debug("end report position request: {}",
JsonUtils.toJSONString(reportPositionRequest));
for (Map.Entry<RecordPartition, RecordOffset> entry :
recordMap.entrySet()) {
positionStore.remove(entry.getKey());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]