This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 293a61ef0 [ISSUE #5137] update connector runtime v2 module  (#5138)
293a61ef0 is described below

commit 293a61ef071da403d608bc956f67fe5469936ae1
Author: mike_xwm <[email protected]>
AuthorDate: Mon Dec 9 15:12:32 2024 +0800

    [ISSUE #5137] update connector runtime v2 module  (#5138)
    
    * [ISSUE #5137] update connector runtime v2 module
    
    * fix checkStyle error
---
 .../remote/request/ReportMonitorRequest.java       |  21 +-
 .../api/monitor/AbstractConnectorMonitor.java      |  80 +++++++
 .../eventmesh/openconnect/api/monitor/Monitor.java |  15 +-
 .../openconnect/api/monitor/MonitorRegistry.java   |  17 +-
 .../runtime/boot/RuntimeInstanceStarter.java       |   1 -
 .../runtime/connector/ConnectorRuntime.java        | 230 ++++++++-------------
 .../runtime/service/health/HealthService.java      | 112 ++++++++++
 .../runtime/service/monitor/MonitorService.java    | 144 +++++++++++++
 .../runtime/service/monitor/SinkMonitor.java       |  52 +++++
 .../monitor/SourceMonitor.java}                    |  30 ++-
 .../runtime/service/status/StatusService.java      |  94 +++++++++
 .../runtime/service/verify/VerifyService.java      | 138 +++++++++++++
 .../eventmesh/runtime/util/RuntimeUtils.java       |  13 ++
 13 files changed, 797 insertions(+), 150 deletions(-)

diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java
similarity index 59%
copy from 
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java
index e389357d9..12278df27 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java
@@ -15,7 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.common.remote.request;
 
-public class RuntimeUtils {
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString
+public class ReportMonitorRequest extends BaseRemoteRequest {
+    private String taskID;
+    private String jobID;
+    private String address;
+    private String connectorStage;
+    private String transportType;
+    private long totalReqNum;
+    private long totalTimeCost;
+    private long maxTimeCost;
+    private long avgTimeCost;
+    private double tps;
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java
new file mode 100644
index 000000000..b9205804a
--- /dev/null
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.openconnect.api.monitor;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+public abstract class AbstractConnectorMonitor implements Monitor {
+
+    private final String taskId;
+    private final String jobId;
+    private final String ip;
+    private final LongAdder totalRecordNum;
+    private final LongAdder totalTimeCost;
+    protected final AtomicLong startTime;
+    private final AtomicLong maxTimeCost;
+    private long averageTime = 0;
+    private double tps = 0;
+
+    public AbstractConnectorMonitor(String taskId, String jobId, String ip) {
+        this.taskId = taskId;
+        this.jobId = jobId;
+        this.ip = ip;
+        this.totalRecordNum = new LongAdder();
+        this.totalTimeCost = new LongAdder();
+        this.startTime = new AtomicLong(System.currentTimeMillis());
+        this.maxTimeCost = new AtomicLong();
+    }
+
+    @Override
+    public synchronized void recordProcess(long timeCost) {
+        totalRecordNum.increment();
+        totalTimeCost.add(timeCost);
+        maxTimeCost.updateAndGet(max -> Math.max(max, timeCost));
+    }
+
+    @Override
+    public synchronized void recordProcess(int recordCount, long timeCost) {
+        totalRecordNum.add(recordCount);
+        totalTimeCost.add(timeCost);
+        maxTimeCost.updateAndGet(max -> Math.max(max, timeCost));
+    }
+
+    @Override
+    public synchronized void printMetrics() {
+        long totalRecords = totalRecordNum.sum();
+        long totalCost = totalTimeCost.sum();
+        averageTime = totalRecords > 0 ? totalCost / totalRecords : 0;
+        long elapsedTime = (System.currentTimeMillis() - startTime.get()) / 
1000; // in seconds
+        tps = elapsedTime > 0 ? (double) totalRecords / elapsedTime : 0;
+
+        log.info("========== Metrics ==========");
+        log.info("TaskId: {}|JobId: {}|ip: {}", taskId, jobId, ip);
+        log.info("Total records: {}", totalRecordNum);
+        log.info("Total time (ms): {}", totalTimeCost);
+        log.info("Max time per record (ms): {}", maxTimeCost);
+        log.info("Average time per record (ms): {}", averageTime);
+        log.info("TPS: {}", tps);
+    }
+}
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java
similarity index 73%
copy from 
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java
index e389357d9..4d4d9efb0 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java
@@ -15,7 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.openconnect.api.monitor;
 
-public class RuntimeUtils {
-}
+/**
+ * Monitor Interface.
+ * All monitors should implement this interface.
+ */
+public interface Monitor {
+    void recordProcess(long timeCost);
+
+    void recordProcess(int recordCount, long timeCost);
+
+    void printMetrics();
+}
\ No newline at end of file
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java
similarity index 70%
copy from 
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to 
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java
index e389357d9..904efc5d3 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java
@@ -15,7 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.openconnect.api.monitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import lombok.Getter;
+
+public class MonitorRegistry {
+
+    @Getter
+    private static final List<Monitor> monitors = new ArrayList<>();
+
+    public static void registerMonitor(Monitor monitor) {
+        monitors.add(monitor);
+    }
 
-public class RuntimeUtils {
 }
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
index 42745c8dd..088152187 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
@@ -40,7 +40,6 @@ public class RuntimeInstanceStarter {
                     long start = System.currentTimeMillis();
                     runtimeInstance.shutdown();
                     long end = System.currentTimeMillis();
-
                     log.info("runtime shutdown cost {}ms", end - start);
                 } catch (Exception e) {
                     log.error("exception when shutdown {}", e.getMessage(), e);
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 3d3c864b5..92e78256e 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -33,9 +33,6 @@ import 
org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
 import org.apache.eventmesh.common.remote.JobState;
 import org.apache.eventmesh.common.remote.request.FetchJobRequest;
-import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
-import org.apache.eventmesh.common.remote.request.ReportJobRequest;
-import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
 import org.apache.eventmesh.common.remote.response.FetchJobResponse;
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
@@ -57,33 +54,34 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageWrit
 import org.apache.eventmesh.openconnect.util.ConfigUtil;
 import org.apache.eventmesh.runtime.Runtime;
 import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
+import org.apache.eventmesh.runtime.service.health.HealthService;
+import org.apache.eventmesh.runtime.service.monitor.MonitorService;
+import org.apache.eventmesh.runtime.service.monitor.SinkMonitor;
+import org.apache.eventmesh.runtime.service.monitor.SourceMonitor;
+import org.apache.eventmesh.runtime.service.status.StatusService;
+import org.apache.eventmesh.runtime.service.verify.VerifyService;
+import org.apache.eventmesh.runtime.util.RuntimeUtils;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.grpc.stub.StreamObserver;
 
 import com.google.protobuf.Any;
 import com.google.protobuf.UnsafeByteOperations;
@@ -103,10 +101,6 @@ public class ConnectorRuntime implements Runtime {
 
     private AdminServiceBlockingStub adminServiceBlockingStub;
 
-    StreamObserver<Payload> responseObserver;
-
-    StreamObserver<Payload> requestObserver;
-
     private Source sourceConnector;
 
     private Sink sinkConnector;
@@ -129,9 +123,6 @@ public class ConnectorRuntime implements Runtime {
 
     private final ExecutorService sinkService = 
ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService");
 
-    private final ScheduledExecutorService heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor();
-
-    private final ExecutorService reportVerifyExecutor = 
Executors.newSingleThreadExecutor();
 
     private final BlockingQueue<ConnectRecord> queue;
 
@@ -143,6 +134,18 @@ public class ConnectorRuntime implements Runtime {
 
     private String adminServerAddr;
 
+    private HealthService healthService;
+
+    private MonitorService monitorService;
+
+    private SourceMonitor sourceMonitor;
+
+    private SinkMonitor sinkMonitor;
+
+    private VerifyService verifyService;
+
+    private StatusService statusService;
+
 
     public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {
         this.runtimeInstanceConfig = runtimeInstanceConfig;
@@ -156,46 +159,31 @@ public class ConnectorRuntime implements Runtime {
 
         initStorageService();
 
+        initStatusService();
+
         initConnectorService();
+
+        initMonitorService();
+
+        initHealthService();
+
+        initVerfiyService();
+
     }
 
     private void initAdminService() {
-        adminServerAddr = 
getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
+        adminServerAddr = 
RuntimeUtils.getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
         // create gRPC channel
-        channel = 
ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build();
+        channel = ManagedChannelBuilder.forTarget(adminServerAddr)
+            .usePlaintext()
+            .enableRetry()
+            .maxRetryAttempts(3)
+            .build();
 
         adminServiceStub = 
AdminServiceGrpc.newStub(channel).withWaitForReady();
 
         adminServiceBlockingStub = 
AdminServiceGrpc.newBlockingStub(channel).withWaitForReady();
 
-        responseObserver = new StreamObserver<Payload>() {
-            @Override
-            public void onNext(Payload response) {
-                log.info("runtime receive message: {} ", response);
-            }
-
-            @Override
-            public void onError(Throwable t) {
-                log.error("runtime receive error message: {}", t.getMessage());
-            }
-
-            @Override
-            public void onCompleted() {
-                log.info("runtime finished receive message and completed");
-            }
-        };
-
-        requestObserver = adminServiceStub.invokeBiStream(responseObserver);
-    }
-
-    private String getRandomAdminServerAddr(String adminServerAddrList) {
-        String[] addresses = adminServerAddrList.split(";");
-        if (addresses.length == 0) {
-            throw new IllegalArgumentException("Admin server address list is 
empty");
-        }
-        Random random = new Random();
-        int randomIndex = random.nextInt(addresses.length);
-        return addresses[randomIndex];
     }
 
     private void initStorageService() {
@@ -206,11 +194,16 @@ public class ConnectorRuntime implements Runtime {
 
     }
 
+    private void initStatusService() {
+        statusService = new StatusService(adminServiceStub, 
adminServiceBlockingStub);
+    }
+
     private void initConnectorService() throws Exception {
 
         connectorRuntimeConfig = 
ConfigService.getInstance().buildConfigInstance(ConnectorRuntimeConfig.class);
 
         FetchJobResponse jobResponse = fetchJobConfig();
+        log.info("fetch job config from admin server: {}", 
JsonUtils.toJSONString(jobResponse));
 
         if (jobResponse == null) {
             isFailed = true;
@@ -271,7 +264,7 @@ public class ConnectorRuntime implements Runtime {
         sinkConnectorContext.setJobType(jobResponse.getType());
         sinkConnector.init(sinkConnectorContext);
 
-        reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.INIT);
+        statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), 
JobState.INIT);
 
     }
 
@@ -292,27 +285,31 @@ public class ConnectorRuntime implements Runtime {
         return null;
     }
 
-    @Override
-    public void start() throws Exception {
+    private void initMonitorService() {
+        monitorService = new MonitorService(adminServiceStub, 
adminServiceBlockingStub);
+        sourceMonitor = new SourceMonitor(connectorRuntimeConfig.getTaskID(), 
connectorRuntimeConfig.getJobID(), IPUtils.getLocalAddress());
+        monitorService.registerMonitor(sourceMonitor);
+        sinkMonitor = new SinkMonitor(connectorRuntimeConfig.getTaskID(), 
connectorRuntimeConfig.getJobID(), IPUtils.getLocalAddress());
+        monitorService.registerMonitor(sinkMonitor);
+    }
 
-        heartBeatExecutor.scheduleAtFixedRate(() -> {
+    private void initHealthService() {
+        healthService = new HealthService(adminServiceStub, 
adminServiceBlockingStub, connectorRuntimeConfig);
+    }
 
-            ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest();
-            heartBeat.setAddress(IPUtils.getLocalAddress());
-            
heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis()));
-            heartBeat.setJobID(connectorRuntimeConfig.getJobID());
+    private void initVerfiyService() {
+        verifyService = new VerifyService(adminServiceStub, 
adminServiceBlockingStub, connectorRuntimeConfig);
+    }
 
-            Metadata metadata = 
Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build();
+    @Override
+    public void start() throws Exception {
+        // start offsetMgmtService
+        offsetManagementService.start();
 
-            Payload request = Payload.newBuilder().setMetadata(metadata)
-                
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build())
-                .build();
+        monitorService.start();
 
-            requestObserver.onNext(request);
-        }, 5, 5, TimeUnit.SECONDS);
+        healthService.start();
 
-        // start offsetMgmtService
-        offsetManagementService.start();
         isRunning = true;
         // start sinkService
         sinkService.execute(() -> {
@@ -320,32 +317,34 @@ public class ConnectorRuntime implements Runtime {
                 startSinkConnector();
             } catch (Exception e) {
                 isFailed = true;
-                log.error("sink connector [{}] start fail", 
sinkConnector.name(), e);
+                log.error("sink connector start fail", e.getStackTrace());
                 try {
                     this.stop();
                 } catch (Exception ex) {
                     log.error("Failed to stop after exception", ex);
                 }
-                throw new RuntimeException(e);
+            } finally {
+                System.exit(-1);
             }
         });
-        // start
+        // start sourceService
         sourceService.execute(() -> {
             try {
                 startSourceConnector();
             } catch (Exception e) {
                 isFailed = true;
-                log.error("source connector [{}] start fail", 
sourceConnector.name(), e);
+                log.error("source connector start fail", e);
                 try {
                     this.stop();
                 } catch (Exception ex) {
                     log.error("Failed to stop after exception", ex);
                 }
-                throw new RuntimeException(e);
+            } finally {
+                System.exit(-1);
             }
         });
 
-        reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.RUNNING);
+        statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), 
JobState.RUNNING);
     }
 
     @Override
@@ -353,26 +352,30 @@ public class ConnectorRuntime implements Runtime {
         log.info("ConnectorRuntime start stop");
         isRunning = false;
         if (isFailed) {
-            reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL);
+            statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), 
JobState.FAIL);
         } else {
-            reportJobRequest(connectorRuntimeConfig.getJobID(), 
JobState.COMPLETE);
+            statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), 
JobState.COMPLETE);
         }
         sourceConnector.stop();
         sinkConnector.stop();
+        monitorService.stop();
+        healthService.stop();
         sourceService.shutdown();
         sinkService.shutdown();
-        heartBeatExecutor.shutdown();
-        reportVerifyExecutor.shutdown();
-        requestObserver.onCompleted();
+        verifyService.stop();
+        statusService.stop();
         if (channel != null && !channel.isShutdown()) {
-            channel.shutdown();
+            channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
         }
+        log.info("ConnectorRuntime stopped");
     }
 
     private void startSourceConnector() throws Exception {
         sourceConnector.start();
         while (isRunning) {
+            long sourceStartTime = System.currentTimeMillis();
             List<ConnectRecord> connectorRecordList = sourceConnector.poll();
+            long sinkStartTime = System.currentTimeMillis();
             // TODO: use producer pub record to storage replace below
             if (connectorRecordList != null && !connectorRecordList.isEmpty()) 
{
                 for (ConnectRecord record : connectorRecordList) {
@@ -381,19 +384,14 @@ public class ConnectorRuntime implements Runtime {
                         record.addExtension("recordUniqueId", 
record.getRecordId());
                     }
 
-                    queue.put(record);
-
-                    // if enabled incremental data reporting consistency check
-                    if 
(connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
-                        reportVerifyRequest(record, connectorRuntimeConfig, 
ConnectorStage.SOURCE);
-                    }
-
                     // set a callback for this record
                     // if used the memory storage callback will be triggered 
after sink put success
                     record.setCallback(new SendMessageCallback() {
                         @Override
                         public void onSuccess(SendResult result) {
                             log.debug("send record to sink callback success, 
record: {}", record);
+                            long sinkEndTime = System.currentTimeMillis();
+                            sinkMonitor.recordProcess(sinkEndTime - 
sinkStartTime);
                             // commit record
                             sourceConnector.commit(record);
                             if (record.getPosition() != null) {
@@ -424,6 +422,16 @@ public class ConnectorRuntime implements Runtime {
                             }
                         }
                     });
+
+                    queue.put(record);
+                    long sourceEndTime = System.currentTimeMillis();
+                    sourceMonitor.recordProcess(sourceEndTime - 
sourceStartTime);
+
+                    // if enabled incremental data reporting consistency check
+                    if 
(connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
+                        verifyService.reportVerifyRequest(record, 
ConnectorStage.SOURCE);
+                    }
+
                 }
             }
         }
@@ -438,64 +446,6 @@ public class ConnectorRuntime implements Runtime {
         return result;
     }
 
-    private void reportVerifyRequest(ConnectRecord record, 
ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
-        reportVerifyExecutor.submit(() -> {
-            try {
-                // use record data + recordUniqueId for md5
-                String md5Str = md5(record.getData().toString() + 
record.getExtension("recordUniqueId"));
-                ReportVerifyRequest reportVerifyRequest = new 
ReportVerifyRequest();
-                
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
-                
reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID());
-                reportVerifyRequest.setRecordID(record.getRecordId());
-                reportVerifyRequest.setRecordSig(md5Str);
-                reportVerifyRequest.setConnectorName(
-                    IPUtils.getLocalAddress() + "_" + 
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
-                reportVerifyRequest.setConnectorStage(connectorStage.name());
-                
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
-
-                Metadata metadata = 
Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
-
-                Payload request = Payload.newBuilder().setMetadata(metadata)
-                    .setBody(
-                        
Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
-                            .build())
-                    .build();
-
-                requestObserver.onNext(request);
-            } catch (Exception e) {
-                log.error("Failed to report verify request", e);
-            }
-        });
-    }
-
-    private void reportJobRequest(String jobId, JobState jobState) throws 
InterruptedException {
-        ReportJobRequest reportJobRequest = new ReportJobRequest();
-        reportJobRequest.setJobID(jobId);
-        reportJobRequest.setState(jobState);
-        Metadata metadata = Metadata.newBuilder()
-            .setType(ReportJobRequest.class.getSimpleName())
-            .build();
-        Payload payload = Payload.newBuilder()
-            .setMetadata(metadata)
-            
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest))))
-                .build())
-            .build();
-        requestObserver.onNext(payload);
-    }
-
-    private String md5(String input) {
-        try {
-            MessageDigest md = MessageDigest.getInstance("MD5");
-            byte[] messageDigest = md.digest(input.getBytes());
-            StringBuilder sb = new StringBuilder();
-            for (byte b : messageDigest) {
-                sb.append(String.format("%02x", b));
-            }
-            return sb.toString();
-        } catch (NoSuchAlgorithmException e) {
-            throw new RuntimeException(e);
-        }
-    }
 
     public Optional<RecordOffsetManagement.SubmittedPosition> 
prepareToUpdateRecordOffset(ConnectRecord record) {
         return 
Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
@@ -589,7 +539,7 @@ public class ConnectorRuntime implements Runtime {
             sinkConnector.put(connectRecordList);
             // if enabled incremental data reporting consistency check
             if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
-                reportVerifyRequest(connectRecord, connectorRuntimeConfig, 
ConnectorStage.SINK);
+                verifyService.reportVerifyRequest(connectRecord, 
ConnectorStage.SINK);
             }
         }
     }
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java
new file mode 100644
index 000000000..54f924874
--- /dev/null
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.health;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.runtime.connector.ConnectorRuntimeConfig;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HealthService {
+
+    private final ScheduledExecutorService scheduler;
+
+    private StreamObserver<Payload> requestObserver;
+
+    private StreamObserver<Payload> responseObserver;
+
+    private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+    private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+    private ConnectorRuntimeConfig connectorRuntimeConfig;
+
+
+    public HealthService(AdminServiceGrpc.AdminServiceStub adminServiceStub, 
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub,
+                         ConnectorRuntimeConfig connectorRuntimeConfig) {
+        this.adminServiceStub = adminServiceStub;
+        this.adminServiceBlockingStub = adminServiceBlockingStub;
+        this.connectorRuntimeConfig = connectorRuntimeConfig;
+
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        responseObserver = new StreamObserver<Payload>() {
+            @Override
+            public void onNext(Payload response) {
+                log.debug("health service receive message: {}|{} ", 
response.getMetadata(), response.getBody());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("health service receive error message: {}", 
t.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+                log.info("health service finished receive message and 
completed");
+            }
+        };
+        requestObserver = 
this.adminServiceStub.invokeBiStream(responseObserver);
+    }
+
+    public void start() {
+        this.healthReport();
+    }
+
+    public void healthReport() {
+        scheduler.scheduleAtFixedRate(() -> {
+            ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest();
+            heartBeat.setAddress(IPUtils.getLocalAddress());
+            
heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis()));
+            heartBeat.setJobID(connectorRuntimeConfig.getJobID());
+
+            Metadata metadata = 
Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build();
+
+            Payload request = Payload.newBuilder().setMetadata(metadata)
+                
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build())
+                .build();
+
+            requestObserver.onNext(request);
+        }, 5, 5, TimeUnit.SECONDS);
+    }
+
+
+    public void stop() {
+        scheduler.shutdown();
+        if (requestObserver != null) {
+            requestObserver.onCompleted();
+        }
+    }
+
+}
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java
new file mode 100644
index 000000000..f5af7596c
--- /dev/null
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.monitor;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.request.ReportMonitorRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.openconnect.api.monitor.Monitor;
+import org.apache.eventmesh.openconnect.api.monitor.MonitorRegistry;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MonitorService {
+
+    private final ScheduledExecutorService scheduler;
+
+    private StreamObserver<Payload> requestObserver;
+
+    private StreamObserver<Payload> responseObserver;
+
+    private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+    private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+
+    public MonitorService(AdminServiceGrpc.AdminServiceStub adminServiceStub, 
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub) {
+        this.adminServiceStub = adminServiceStub;
+        this.adminServiceBlockingStub = adminServiceBlockingStub;
+
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        responseObserver = new StreamObserver<Payload>() {
+            @Override
+            public void onNext(Payload response) {
+                log.debug("monitor service receive message: {}|{} ", 
response.getMetadata(), response.getBody());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("monitor service receive error message: {}", 
t.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+                log.info("monitor service finished receive message and 
completed");
+            }
+        };
+        requestObserver = 
this.adminServiceStub.invokeBiStream(responseObserver);
+    }
+
+    public void registerMonitor(Monitor monitor) {
+        MonitorRegistry.registerMonitor(monitor);
+    }
+
+    public void start() {
+        this.startReporting();
+    }
+
+    public void startReporting() {
+        scheduler.scheduleAtFixedRate(() -> {
+            List<Monitor> monitors = MonitorRegistry.getMonitors();
+            for (Monitor monitor : monitors) {
+                monitor.printMetrics();
+                reportToAdminService(monitor);
+            }
+        }, 5, 30, TimeUnit.SECONDS);
+    }
+
+    private void reportToAdminService(Monitor monitor) {
+        ReportMonitorRequest request = new ReportMonitorRequest();
+        if (monitor instanceof SourceMonitor) {
+            SourceMonitor sourceMonitor = (SourceMonitor) monitor;
+            request.setTaskID(sourceMonitor.getTaskId());
+            request.setJobID(sourceMonitor.getJobId());
+            request.setAddress(sourceMonitor.getIp());
+            request.setConnectorStage(sourceMonitor.getConnectorStage());
+            
request.setTotalReqNum(sourceMonitor.getTotalRecordNum().longValue());
+            
request.setTotalTimeCost(sourceMonitor.getTotalTimeCost().longValue());
+            request.setMaxTimeCost(sourceMonitor.getMaxTimeCost().longValue());
+            request.setAvgTimeCost(sourceMonitor.getAverageTime());
+            request.setTps(sourceMonitor.getTps());
+        } else if (monitor instanceof SinkMonitor) {
+            SinkMonitor sinkMonitor = (SinkMonitor) monitor;
+            request.setTaskID(sinkMonitor.getTaskId());
+            request.setJobID(sinkMonitor.getJobId());
+            request.setAddress(sinkMonitor.getIp());
+            request.setConnectorStage(sinkMonitor.getConnectorStage());
+            
request.setTotalReqNum(sinkMonitor.getTotalRecordNum().longValue());
+            
request.setTotalTimeCost(sinkMonitor.getTotalTimeCost().longValue());
+            request.setMaxTimeCost(sinkMonitor.getMaxTimeCost().longValue());
+            request.setAvgTimeCost(sinkMonitor.getAverageTime());
+            request.setTps(sinkMonitor.getTps());
+        } else {
+            throw new IllegalArgumentException("Unsupported monitor: " + 
monitor);
+        }
+
+        Metadata metadata = Metadata.newBuilder()
+            .setType(ReportMonitorRequest.class.getSimpleName())
+            .build();
+        Payload payload = Payload.newBuilder()
+            .setMetadata(metadata)
+            
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(request))))
+                .build())
+            .build();
+        requestObserver.onNext(payload);
+    }
+
+    public void stop() {
+        scheduler.shutdown();
+        if (requestObserver != null) {
+            requestObserver.onCompleted();
+        }
+    }
+
+}
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java
new file mode 100644
index 000000000..b27b44da7
--- /dev/null
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.monitor;
+
+import org.apache.eventmesh.common.enums.ConnectorStage;
+import org.apache.eventmesh.openconnect.api.monitor.AbstractConnectorMonitor;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+public class SinkMonitor extends AbstractConnectorMonitor {
+
+    private String connectorStage = ConnectorStage.SINK.name();
+
+    public SinkMonitor(String taskId, String jobId, String ip) {
+        super(taskId, jobId, ip);
+    }
+
+    @Override
+    public void recordProcess(long timeCost) {
+        super.recordProcess(timeCost);
+    }
+
+    @Override
+    public void recordProcess(int recordCount, long timeCost) {
+        super.recordProcess(recordCount, timeCost);
+    }
+
+    @Override
+    public void printMetrics() {
+        super.printMetrics();
+    }
+}
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java
similarity index 51%
copy from 
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to 
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java
index e389357d9..3895c8df1 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java
@@ -15,7 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.runtime.service.monitor;
 
-public class RuntimeUtils {
+import org.apache.eventmesh.common.enums.ConnectorStage;
+import org.apache.eventmesh.openconnect.api.monitor.AbstractConnectorMonitor;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+public class SourceMonitor extends AbstractConnectorMonitor {
+
+    private String connectorStage = ConnectorStage.SOURCE.name();
+
+    public SourceMonitor(String taskId, String jobId, String ip) {
+        super(taskId, jobId, ip);
+    }
+
+    @Override
+    public void recordProcess(int recordCount, long timeCost) {
+        super.recordProcess(recordCount, timeCost);
+    }
+
+    @Override
+    public void printMetrics() {
+        super.printMetrics();
+    }
 }
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java
new file mode 100644
index 000000000..e40686f57
--- /dev/null
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.status;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.JobState;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import java.util.Objects;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class StatusService {
+
+    private StreamObserver<Payload> requestObserver;
+
+    private StreamObserver<Payload> responseObserver;
+
+    private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+    private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+
+    public StatusService(AdminServiceGrpc.AdminServiceStub adminServiceStub, 
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub) {
+        this.adminServiceStub = adminServiceStub;
+        this.adminServiceBlockingStub = adminServiceBlockingStub;
+
+        responseObserver = new StreamObserver<Payload>() {
+            @Override
+            public void onNext(Payload response) {
+                log.debug("health service receive message: {}|{} ", 
response.getMetadata(), response.getBody());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("health service receive error message: {}", 
t.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+                log.info("health service finished receive message and 
completed");
+            }
+        };
+        requestObserver = 
this.adminServiceStub.invokeBiStream(responseObserver);
+    }
+
+    public void reportJobStatus(String jobId, JobState jobState) {
+        ReportJobRequest reportJobRequest = new ReportJobRequest();
+        reportJobRequest.setJobID(jobId);
+        reportJobRequest.setState(jobState);
+        reportJobRequest.setAddress(IPUtils.getLocalAddress());
+        Metadata metadata = Metadata.newBuilder()
+                .setType(ReportJobRequest.class.getSimpleName())
+                .build();
+        Payload payload = Payload.newBuilder()
+                .setMetadata(metadata)
+                
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest))))
+                        .build())
+                .build();
+        log.info("report job state request: {}", 
JsonUtils.toJSONString(reportJobRequest));
+        requestObserver.onNext(payload);
+    }
+
+    public void stop() {
+        if (requestObserver != null) {
+            requestObserver.onCompleted();
+        }
+    }
+}
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java
new file mode 100644
index 000000000..8bcb72199
--- /dev/null
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.verify;
+
+import org.apache.eventmesh.common.enums.ConnectorStage;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.runtime.connector.ConnectorRuntimeConfig;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class VerifyService {
+
+    private final ExecutorService reportVerifyExecutor;
+
+    private StreamObserver<Payload> requestObserver;
+
+    private StreamObserver<Payload> responseObserver;
+
+    private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+    private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+    private ConnectorRuntimeConfig connectorRuntimeConfig;
+
+
+    public VerifyService(AdminServiceGrpc.AdminServiceStub adminServiceStub, 
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub,
+                         ConnectorRuntimeConfig connectorRuntimeConfig) {
+        this.adminServiceStub = adminServiceStub;
+        this.adminServiceBlockingStub = adminServiceBlockingStub;
+        this.connectorRuntimeConfig = connectorRuntimeConfig;
+
+        this.reportVerifyExecutor = Executors.newSingleThreadExecutor();
+
+        responseObserver = new StreamObserver<Payload>() {
+            @Override
+            public void onNext(Payload response) {
+                log.debug("verify service receive message: {}|{} ", 
response.getMetadata(), response.getBody());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("verify service receive error message: {}", 
t.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+                log.info("verify service finished receive message and 
completed");
+            }
+        };
+        requestObserver = 
this.adminServiceStub.invokeBiStream(responseObserver);
+    }
+
+    public void reportVerifyRequest(ConnectRecord record, ConnectorStage 
connectorStage) {
+        reportVerifyExecutor.submit(() -> {
+            try {
+                byte[] data = (byte[]) record.getData();
+                // use record data + recordUniqueId for md5
+                String md5Str = md5(Arrays.toString(data) + 
record.getExtension("recordUniqueId"));
+                ReportVerifyRequest reportVerifyRequest = new 
ReportVerifyRequest();
+                
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
+                
reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID());
+                
reportVerifyRequest.setRecordID(record.getExtension("recordUniqueId"));
+                reportVerifyRequest.setRecordSig(md5Str);
+                reportVerifyRequest.setConnectorName(
+                    IPUtils.getLocalAddress() + "_" + 
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
+                reportVerifyRequest.setConnectorStage(connectorStage.name());
+                
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
+
+                Metadata metadata = 
Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
+
+                Payload request = Payload.newBuilder().setMetadata(metadata)
+                    .setBody(
+                        
Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
+                            .build())
+                    .build();
+                requestObserver.onNext(request);
+            } catch (Exception e) {
+                log.error("Failed to report verify request", e);
+            }
+        });
+    }
+
+    private String md5(String input) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] messageDigest = md.digest(input.getBytes());
+            StringBuilder sb = new StringBuilder();
+            for (byte b : messageDigest) {
+                sb.append(String.format("%02x", b));
+            }
+            return sb.toString();
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() {
+        reportVerifyExecutor.shutdown();
+        if (requestObserver != null) {
+            requestObserver.onCompleted();
+        }
+    }
+
+}
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
index e389357d9..844a9638a 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
@@ -17,5 +17,18 @@
 
 package org.apache.eventmesh.runtime.util;
 
+import java.util.Random;
+
 public class RuntimeUtils {
+
+    public static String getRandomAdminServerAddr(String adminServerAddrList) {
+        String[] addresses = adminServerAddrList.split(";");
+        if (addresses.length == 0) {
+            throw new IllegalArgumentException("Admin server address list is 
empty");
+        }
+        Random random = new Random();
+        int randomIndex = random.nextInt(addresses.length);
+        return addresses[randomIndex];
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to