This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 293a61ef0 [ISSUE #5137] update connector runtime v2 module (#5138)
293a61ef0 is described below
commit 293a61ef071da403d608bc956f67fe5469936ae1
Author: mike_xwm <[email protected]>
AuthorDate: Mon Dec 9 15:12:32 2024 +0800
[ISSUE #5137] update connector runtime v2 module (#5138)
* [ISSUE #5137] update connector runtime v2 module
* fix checkStyle error
---
.../remote/request/ReportMonitorRequest.java | 21 +-
.../api/monitor/AbstractConnectorMonitor.java | 80 +++++++
.../eventmesh/openconnect/api/monitor/Monitor.java | 15 +-
.../openconnect/api/monitor/MonitorRegistry.java | 17 +-
.../runtime/boot/RuntimeInstanceStarter.java | 1 -
.../runtime/connector/ConnectorRuntime.java | 230 ++++++++-------------
.../runtime/service/health/HealthService.java | 112 ++++++++++
.../runtime/service/monitor/MonitorService.java | 144 +++++++++++++
.../runtime/service/monitor/SinkMonitor.java | 52 +++++
.../monitor/SourceMonitor.java} | 30 ++-
.../runtime/service/status/StatusService.java | 94 +++++++++
.../runtime/service/verify/VerifyService.java | 138 +++++++++++++
.../eventmesh/runtime/util/RuntimeUtils.java | 13 ++
13 files changed, 797 insertions(+), 150 deletions(-)
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java
similarity index 59%
copy from
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java
index e389357d9..12278df27 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java
@@ -15,7 +15,24 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.common.remote.request;
-public class RuntimeUtils {
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString
+public class ReportMonitorRequest extends BaseRemoteRequest {
+ private String taskID;
+ private String jobID;
+ private String address;
+ private String connectorStage;
+ private String transportType;
+ private long totalReqNum;
+ private long totalTimeCost;
+ private long maxTimeCost;
+ private long avgTimeCost;
+ private double tps;
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java
new file mode 100644
index 000000000..b9205804a
--- /dev/null
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.openconnect.api.monitor;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+public abstract class AbstractConnectorMonitor implements Monitor {
+
+ private final String taskId;
+ private final String jobId;
+ private final String ip;
+ private final LongAdder totalRecordNum;
+ private final LongAdder totalTimeCost;
+ protected final AtomicLong startTime;
+ private final AtomicLong maxTimeCost;
+ private long averageTime = 0;
+ private double tps = 0;
+
+ public AbstractConnectorMonitor(String taskId, String jobId, String ip) {
+ this.taskId = taskId;
+ this.jobId = jobId;
+ this.ip = ip;
+ this.totalRecordNum = new LongAdder();
+ this.totalTimeCost = new LongAdder();
+ this.startTime = new AtomicLong(System.currentTimeMillis());
+ this.maxTimeCost = new AtomicLong();
+ }
+
+ @Override
+ public synchronized void recordProcess(long timeCost) {
+ totalRecordNum.increment();
+ totalTimeCost.add(timeCost);
+ maxTimeCost.updateAndGet(max -> Math.max(max, timeCost));
+ }
+
+ @Override
+ public synchronized void recordProcess(int recordCount, long timeCost) {
+ totalRecordNum.add(recordCount);
+ totalTimeCost.add(timeCost);
+ maxTimeCost.updateAndGet(max -> Math.max(max, timeCost));
+ }
+
+ @Override
+ public synchronized void printMetrics() {
+ long totalRecords = totalRecordNum.sum();
+ long totalCost = totalTimeCost.sum();
+ averageTime = totalRecords > 0 ? totalCost / totalRecords : 0;
+ long elapsedTime = (System.currentTimeMillis() - startTime.get()) /
1000; // in seconds
+ tps = elapsedTime > 0 ? (double) totalRecords / elapsedTime : 0;
+
+ log.info("========== Metrics ==========");
+ log.info("TaskId: {}|JobId: {}|ip: {}", taskId, jobId, ip);
+ log.info("Total records: {}", totalRecordNum);
+ log.info("Total time (ms): {}", totalTimeCost);
+ log.info("Max time per record (ms): {}", maxTimeCost);
+ log.info("Average time per record (ms): {}", averageTime);
+ log.info("TPS: {}", tps);
+ }
+}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java
similarity index 73%
copy from
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java
index e389357d9..4d4d9efb0 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java
@@ -15,7 +15,16 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.openconnect.api.monitor;
-public class RuntimeUtils {
-}
+/**
+ * Monitor Interface.
+ * All monitors should implement this interface.
+ */
+public interface Monitor {
+ void recordProcess(long timeCost);
+
+ void recordProcess(int recordCount, long timeCost);
+
+ void printMetrics();
+}
\ No newline at end of file
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java
similarity index 70%
copy from
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to
eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java
index e389357d9..904efc5d3 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java
@@ -15,7 +15,20 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.openconnect.api.monitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import lombok.Getter;
+
+public class MonitorRegistry {
+
+ @Getter
+ private static final List<Monitor> monitors = new ArrayList<>();
+
+ public static void registerMonitor(Monitor monitor) {
+ monitors.add(monitor);
+ }
-public class RuntimeUtils {
}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
index 42745c8dd..088152187 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java
@@ -40,7 +40,6 @@ public class RuntimeInstanceStarter {
long start = System.currentTimeMillis();
runtimeInstance.shutdown();
long end = System.currentTimeMillis();
-
log.info("runtime shutdown cost {}ms", end - start);
} catch (Exception e) {
log.error("exception when shutdown {}", e.getMessage(), e);
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 3d3c864b5..92e78256e 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -33,9 +33,6 @@ import
org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.request.FetchJobRequest;
-import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
-import org.apache.eventmesh.common.remote.request.ReportJobRequest;
-import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.FetchJobResponse;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
@@ -57,33 +54,34 @@ import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageWrit
import org.apache.eventmesh.openconnect.util.ConfigUtil;
import org.apache.eventmesh.runtime.Runtime;
import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
+import org.apache.eventmesh.runtime.service.health.HealthService;
+import org.apache.eventmesh.runtime.service.monitor.MonitorService;
+import org.apache.eventmesh.runtime.service.monitor.SinkMonitor;
+import org.apache.eventmesh.runtime.service.monitor.SourceMonitor;
+import org.apache.eventmesh.runtime.service.status.StatusService;
+import org.apache.eventmesh.runtime.service.verify.VerifyService;
+import org.apache.eventmesh.runtime.util.RuntimeUtils;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.stub.StreamObserver;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
@@ -103,10 +101,6 @@ public class ConnectorRuntime implements Runtime {
private AdminServiceBlockingStub adminServiceBlockingStub;
- StreamObserver<Payload> responseObserver;
-
- StreamObserver<Payload> requestObserver;
-
private Source sourceConnector;
private Sink sinkConnector;
@@ -129,9 +123,6 @@ public class ConnectorRuntime implements Runtime {
private final ExecutorService sinkService =
ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService");
- private final ScheduledExecutorService heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor();
-
- private final ExecutorService reportVerifyExecutor =
Executors.newSingleThreadExecutor();
private final BlockingQueue<ConnectRecord> queue;
@@ -143,6 +134,18 @@ public class ConnectorRuntime implements Runtime {
private String adminServerAddr;
+ private HealthService healthService;
+
+ private MonitorService monitorService;
+
+ private SourceMonitor sourceMonitor;
+
+ private SinkMonitor sinkMonitor;
+
+ private VerifyService verifyService;
+
+ private StatusService statusService;
+
public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
@@ -156,46 +159,31 @@ public class ConnectorRuntime implements Runtime {
initStorageService();
+ initStatusService();
+
initConnectorService();
+
+ initMonitorService();
+
+ initHealthService();
+
+ initVerfiyService();
+
}
private void initAdminService() {
- adminServerAddr =
getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
+ adminServerAddr =
RuntimeUtils.getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
// create gRPC channel
- channel =
ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build();
+ channel = ManagedChannelBuilder.forTarget(adminServerAddr)
+ .usePlaintext()
+ .enableRetry()
+ .maxRetryAttempts(3)
+ .build();
adminServiceStub =
AdminServiceGrpc.newStub(channel).withWaitForReady();
adminServiceBlockingStub =
AdminServiceGrpc.newBlockingStub(channel).withWaitForReady();
- responseObserver = new StreamObserver<Payload>() {
- @Override
- public void onNext(Payload response) {
- log.info("runtime receive message: {} ", response);
- }
-
- @Override
- public void onError(Throwable t) {
- log.error("runtime receive error message: {}", t.getMessage());
- }
-
- @Override
- public void onCompleted() {
- log.info("runtime finished receive message and completed");
- }
- };
-
- requestObserver = adminServiceStub.invokeBiStream(responseObserver);
- }
-
- private String getRandomAdminServerAddr(String adminServerAddrList) {
- String[] addresses = adminServerAddrList.split(";");
- if (addresses.length == 0) {
- throw new IllegalArgumentException("Admin server address list is
empty");
- }
- Random random = new Random();
- int randomIndex = random.nextInt(addresses.length);
- return addresses[randomIndex];
}
private void initStorageService() {
@@ -206,11 +194,16 @@ public class ConnectorRuntime implements Runtime {
}
+ private void initStatusService() {
+ statusService = new StatusService(adminServiceStub,
adminServiceBlockingStub);
+ }
+
private void initConnectorService() throws Exception {
connectorRuntimeConfig =
ConfigService.getInstance().buildConfigInstance(ConnectorRuntimeConfig.class);
FetchJobResponse jobResponse = fetchJobConfig();
+ log.info("fetch job config from admin server: {}",
JsonUtils.toJSONString(jobResponse));
if (jobResponse == null) {
isFailed = true;
@@ -271,7 +264,7 @@ public class ConnectorRuntime implements Runtime {
sinkConnectorContext.setJobType(jobResponse.getType());
sinkConnector.init(sinkConnectorContext);
- reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.INIT);
+ statusService.reportJobStatus(connectorRuntimeConfig.getJobID(),
JobState.INIT);
}
@@ -292,27 +285,31 @@ public class ConnectorRuntime implements Runtime {
return null;
}
- @Override
- public void start() throws Exception {
+ private void initMonitorService() {
+ monitorService = new MonitorService(adminServiceStub,
adminServiceBlockingStub);
+ sourceMonitor = new SourceMonitor(connectorRuntimeConfig.getTaskID(),
connectorRuntimeConfig.getJobID(), IPUtils.getLocalAddress());
+ monitorService.registerMonitor(sourceMonitor);
+ sinkMonitor = new SinkMonitor(connectorRuntimeConfig.getTaskID(),
connectorRuntimeConfig.getJobID(), IPUtils.getLocalAddress());
+ monitorService.registerMonitor(sinkMonitor);
+ }
- heartBeatExecutor.scheduleAtFixedRate(() -> {
+ private void initHealthService() {
+ healthService = new HealthService(adminServiceStub,
adminServiceBlockingStub, connectorRuntimeConfig);
+ }
- ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest();
- heartBeat.setAddress(IPUtils.getLocalAddress());
-
heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis()));
- heartBeat.setJobID(connectorRuntimeConfig.getJobID());
+ private void initVerfiyService() {
+ verifyService = new VerifyService(adminServiceStub,
adminServiceBlockingStub, connectorRuntimeConfig);
+ }
- Metadata metadata =
Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build();
+ @Override
+ public void start() throws Exception {
+ // start offsetMgmtService
+ offsetManagementService.start();
- Payload request = Payload.newBuilder().setMetadata(metadata)
-
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build())
- .build();
+ monitorService.start();
- requestObserver.onNext(request);
- }, 5, 5, TimeUnit.SECONDS);
+ healthService.start();
- // start offsetMgmtService
- offsetManagementService.start();
isRunning = true;
// start sinkService
sinkService.execute(() -> {
@@ -320,32 +317,34 @@ public class ConnectorRuntime implements Runtime {
startSinkConnector();
} catch (Exception e) {
isFailed = true;
- log.error("sink connector [{}] start fail",
sinkConnector.name(), e);
+ log.error("sink connector start fail", e.getStackTrace());
try {
this.stop();
} catch (Exception ex) {
log.error("Failed to stop after exception", ex);
}
- throw new RuntimeException(e);
+ } finally {
+ System.exit(-1);
}
});
- // start
+ // start sourceService
sourceService.execute(() -> {
try {
startSourceConnector();
} catch (Exception e) {
isFailed = true;
- log.error("source connector [{}] start fail",
sourceConnector.name(), e);
+ log.error("source connector start fail", e);
try {
this.stop();
} catch (Exception ex) {
log.error("Failed to stop after exception", ex);
}
- throw new RuntimeException(e);
+ } finally {
+ System.exit(-1);
}
});
- reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.RUNNING);
+ statusService.reportJobStatus(connectorRuntimeConfig.getJobID(),
JobState.RUNNING);
}
@Override
@@ -353,26 +352,30 @@ public class ConnectorRuntime implements Runtime {
log.info("ConnectorRuntime start stop");
isRunning = false;
if (isFailed) {
- reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL);
+ statusService.reportJobStatus(connectorRuntimeConfig.getJobID(),
JobState.FAIL);
} else {
- reportJobRequest(connectorRuntimeConfig.getJobID(),
JobState.COMPLETE);
+ statusService.reportJobStatus(connectorRuntimeConfig.getJobID(),
JobState.COMPLETE);
}
sourceConnector.stop();
sinkConnector.stop();
+ monitorService.stop();
+ healthService.stop();
sourceService.shutdown();
sinkService.shutdown();
- heartBeatExecutor.shutdown();
- reportVerifyExecutor.shutdown();
- requestObserver.onCompleted();
+ verifyService.stop();
+ statusService.stop();
if (channel != null && !channel.isShutdown()) {
- channel.shutdown();
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
+ log.info("ConnectorRuntime stopped");
}
private void startSourceConnector() throws Exception {
sourceConnector.start();
while (isRunning) {
+ long sourceStartTime = System.currentTimeMillis();
List<ConnectRecord> connectorRecordList = sourceConnector.poll();
+ long sinkStartTime = System.currentTimeMillis();
// TODO: use producer pub record to storage replace below
if (connectorRecordList != null && !connectorRecordList.isEmpty())
{
for (ConnectRecord record : connectorRecordList) {
@@ -381,19 +384,14 @@ public class ConnectorRuntime implements Runtime {
record.addExtension("recordUniqueId",
record.getRecordId());
}
- queue.put(record);
-
- // if enabled incremental data reporting consistency check
- if
(connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
- reportVerifyRequest(record, connectorRuntimeConfig,
ConnectorStage.SOURCE);
- }
-
// set a callback for this record
// if used the memory storage callback will be triggered
after sink put success
record.setCallback(new SendMessageCallback() {
@Override
public void onSuccess(SendResult result) {
log.debug("send record to sink callback success,
record: {}", record);
+ long sinkEndTime = System.currentTimeMillis();
+ sinkMonitor.recordProcess(sinkEndTime -
sinkStartTime);
// commit record
sourceConnector.commit(record);
if (record.getPosition() != null) {
@@ -424,6 +422,16 @@ public class ConnectorRuntime implements Runtime {
}
}
});
+
+ queue.put(record);
+ long sourceEndTime = System.currentTimeMillis();
+ sourceMonitor.recordProcess(sourceEndTime -
sourceStartTime);
+
+ // if enabled incremental data reporting consistency check
+ if
(connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
+ verifyService.reportVerifyRequest(record,
ConnectorStage.SOURCE);
+ }
+
}
}
}
@@ -438,64 +446,6 @@ public class ConnectorRuntime implements Runtime {
return result;
}
- private void reportVerifyRequest(ConnectRecord record,
ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
- reportVerifyExecutor.submit(() -> {
- try {
- // use record data + recordUniqueId for md5
- String md5Str = md5(record.getData().toString() +
record.getExtension("recordUniqueId"));
- ReportVerifyRequest reportVerifyRequest = new
ReportVerifyRequest();
-
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
-
reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID());
- reportVerifyRequest.setRecordID(record.getRecordId());
- reportVerifyRequest.setRecordSig(md5Str);
- reportVerifyRequest.setConnectorName(
- IPUtils.getLocalAddress() + "_" +
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
- reportVerifyRequest.setConnectorStage(connectorStage.name());
-
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
-
- Metadata metadata =
Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
-
- Payload request = Payload.newBuilder().setMetadata(metadata)
- .setBody(
-
Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
- .build())
- .build();
-
- requestObserver.onNext(request);
- } catch (Exception e) {
- log.error("Failed to report verify request", e);
- }
- });
- }
-
- private void reportJobRequest(String jobId, JobState jobState) throws
InterruptedException {
- ReportJobRequest reportJobRequest = new ReportJobRequest();
- reportJobRequest.setJobID(jobId);
- reportJobRequest.setState(jobState);
- Metadata metadata = Metadata.newBuilder()
- .setType(ReportJobRequest.class.getSimpleName())
- .build();
- Payload payload = Payload.newBuilder()
- .setMetadata(metadata)
-
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest))))
- .build())
- .build();
- requestObserver.onNext(payload);
- }
-
- private String md5(String input) {
- try {
- MessageDigest md = MessageDigest.getInstance("MD5");
- byte[] messageDigest = md.digest(input.getBytes());
- StringBuilder sb = new StringBuilder();
- for (byte b : messageDigest) {
- sb.append(String.format("%02x", b));
- }
- return sb.toString();
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
public Optional<RecordOffsetManagement.SubmittedPosition>
prepareToUpdateRecordOffset(ConnectRecord record) {
return
Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
@@ -589,7 +539,7 @@ public class ConnectorRuntime implements Runtime {
sinkConnector.put(connectRecordList);
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
- reportVerifyRequest(connectRecord, connectorRuntimeConfig,
ConnectorStage.SINK);
+ verifyService.reportVerifyRequest(connectRecord,
ConnectorStage.SINK);
}
}
}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java
new file mode 100644
index 000000000..54f924874
--- /dev/null
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.health;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.runtime.connector.ConnectorRuntimeConfig;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HealthService {
+
+ private final ScheduledExecutorService scheduler;
+
+ private StreamObserver<Payload> requestObserver;
+
+ private StreamObserver<Payload> responseObserver;
+
+ private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+ private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+ private ConnectorRuntimeConfig connectorRuntimeConfig;
+
+
+ public HealthService(AdminServiceGrpc.AdminServiceStub adminServiceStub,
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub,
+ ConnectorRuntimeConfig connectorRuntimeConfig) {
+ this.adminServiceStub = adminServiceStub;
+ this.adminServiceBlockingStub = adminServiceBlockingStub;
+ this.connectorRuntimeConfig = connectorRuntimeConfig;
+
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ responseObserver = new StreamObserver<Payload>() {
+ @Override
+ public void onNext(Payload response) {
+ log.debug("health service receive message: {}|{} ",
response.getMetadata(), response.getBody());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("health service receive error message: {}",
t.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("health service finished receive message and
completed");
+ }
+ };
+ requestObserver =
this.adminServiceStub.invokeBiStream(responseObserver);
+ }
+
+ public void start() {
+ this.healthReport();
+ }
+
+ public void healthReport() {
+ scheduler.scheduleAtFixedRate(() -> {
+ ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest();
+ heartBeat.setAddress(IPUtils.getLocalAddress());
+
heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis()));
+ heartBeat.setJobID(connectorRuntimeConfig.getJobID());
+
+ Metadata metadata =
Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build();
+
+ Payload request = Payload.newBuilder().setMetadata(metadata)
+
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build())
+ .build();
+
+ requestObserver.onNext(request);
+ }, 5, 5, TimeUnit.SECONDS);
+ }
+
+
+ public void stop() {
+ scheduler.shutdown();
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ }
+ }
+
+}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java
new file mode 100644
index 000000000..f5af7596c
--- /dev/null
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.monitor;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.request.ReportMonitorRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.openconnect.api.monitor.Monitor;
+import org.apache.eventmesh.openconnect.api.monitor.MonitorRegistry;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MonitorService {
+
+ private final ScheduledExecutorService scheduler;
+
+ private StreamObserver<Payload> requestObserver;
+
+ private StreamObserver<Payload> responseObserver;
+
+ private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+ private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+
+ public MonitorService(AdminServiceGrpc.AdminServiceStub adminServiceStub,
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub) {
+ this.adminServiceStub = adminServiceStub;
+ this.adminServiceBlockingStub = adminServiceBlockingStub;
+
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ responseObserver = new StreamObserver<Payload>() {
+ @Override
+ public void onNext(Payload response) {
+ log.debug("monitor service receive message: {}|{} ",
response.getMetadata(), response.getBody());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("monitor service receive error message: {}",
t.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("monitor service finished receive message and
completed");
+ }
+ };
+ requestObserver =
this.adminServiceStub.invokeBiStream(responseObserver);
+ }
+
+ public void registerMonitor(Monitor monitor) {
+ MonitorRegistry.registerMonitor(monitor);
+ }
+
+ public void start() {
+ this.startReporting();
+ }
+
+ public void startReporting() {
+ scheduler.scheduleAtFixedRate(() -> {
+ List<Monitor> monitors = MonitorRegistry.getMonitors();
+ for (Monitor monitor : monitors) {
+ monitor.printMetrics();
+ reportToAdminService(monitor);
+ }
+ }, 5, 30, TimeUnit.SECONDS);
+ }
+
+ private void reportToAdminService(Monitor monitor) {
+ ReportMonitorRequest request = new ReportMonitorRequest();
+ if (monitor instanceof SourceMonitor) {
+ SourceMonitor sourceMonitor = (SourceMonitor) monitor;
+ request.setTaskID(sourceMonitor.getTaskId());
+ request.setJobID(sourceMonitor.getJobId());
+ request.setAddress(sourceMonitor.getIp());
+ request.setConnectorStage(sourceMonitor.getConnectorStage());
+
request.setTotalReqNum(sourceMonitor.getTotalRecordNum().longValue());
+
request.setTotalTimeCost(sourceMonitor.getTotalTimeCost().longValue());
+ request.setMaxTimeCost(sourceMonitor.getMaxTimeCost().longValue());
+ request.setAvgTimeCost(sourceMonitor.getAverageTime());
+ request.setTps(sourceMonitor.getTps());
+ } else if (monitor instanceof SinkMonitor) {
+ SinkMonitor sinkMonitor = (SinkMonitor) monitor;
+ request.setTaskID(sinkMonitor.getTaskId());
+ request.setJobID(sinkMonitor.getJobId());
+ request.setAddress(sinkMonitor.getIp());
+ request.setConnectorStage(sinkMonitor.getConnectorStage());
+
request.setTotalReqNum(sinkMonitor.getTotalRecordNum().longValue());
+
request.setTotalTimeCost(sinkMonitor.getTotalTimeCost().longValue());
+ request.setMaxTimeCost(sinkMonitor.getMaxTimeCost().longValue());
+ request.setAvgTimeCost(sinkMonitor.getAverageTime());
+ request.setTps(sinkMonitor.getTps());
+ } else {
+ throw new IllegalArgumentException("Unsupported monitor: " +
monitor);
+ }
+
+ Metadata metadata = Metadata.newBuilder()
+ .setType(ReportMonitorRequest.class.getSimpleName())
+ .build();
+ Payload payload = Payload.newBuilder()
+ .setMetadata(metadata)
+
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(request))))
+ .build())
+ .build();
+ requestObserver.onNext(payload);
+ }
+
+ public void stop() {
+ scheduler.shutdown();
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ }
+ }
+
+}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java
new file mode 100644
index 000000000..b27b44da7
--- /dev/null
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.monitor;
+
+import org.apache.eventmesh.common.enums.ConnectorStage;
+import org.apache.eventmesh.openconnect.api.monitor.AbstractConnectorMonitor;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+public class SinkMonitor extends AbstractConnectorMonitor {
+
+ private String connectorStage = ConnectorStage.SINK.name();
+
+ public SinkMonitor(String taskId, String jobId, String ip) {
+ super(taskId, jobId, ip);
+ }
+
+ @Override
+ public void recordProcess(long timeCost) {
+ super.recordProcess(timeCost);
+ }
+
+ @Override
+ public void recordProcess(int recordCount, long timeCost) {
+ super.recordProcess(recordCount, timeCost);
+ }
+
+ @Override
+ public void printMetrics() {
+ super.printMetrics();
+ }
+}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java
similarity index 51%
copy from
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
copy to
eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java
index e389357d9..3895c8df1 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java
@@ -15,7 +15,33 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.runtime.service.monitor;
-public class RuntimeUtils {
+import org.apache.eventmesh.common.enums.ConnectorStage;
+import org.apache.eventmesh.openconnect.api.monitor.AbstractConnectorMonitor;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+public class SourceMonitor extends AbstractConnectorMonitor {
+
+ private String connectorStage = ConnectorStage.SOURCE.name();
+
+ public SourceMonitor(String taskId, String jobId, String ip) {
+ super(taskId, jobId, ip);
+ }
+
+ @Override
+ public void recordProcess(int recordCount, long timeCost) {
+ super.recordProcess(recordCount, timeCost);
+ }
+
+ @Override
+ public void printMetrics() {
+ super.printMetrics();
+ }
}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java
new file mode 100644
index 000000000..e40686f57
--- /dev/null
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.status;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.JobState;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import java.util.Objects;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class StatusService {
+
+ private StreamObserver<Payload> requestObserver;
+
+ private StreamObserver<Payload> responseObserver;
+
+ private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+ private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+
+ public StatusService(AdminServiceGrpc.AdminServiceStub adminServiceStub,
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub) {
+ this.adminServiceStub = adminServiceStub;
+ this.adminServiceBlockingStub = adminServiceBlockingStub;
+
+ responseObserver = new StreamObserver<Payload>() {
+ @Override
+ public void onNext(Payload response) {
+ log.debug("health service receive message: {}|{} ",
response.getMetadata(), response.getBody());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("health service receive error message: {}",
t.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("health service finished receive message and
completed");
+ }
+ };
+ requestObserver =
this.adminServiceStub.invokeBiStream(responseObserver);
+ }
+
+ public void reportJobStatus(String jobId, JobState jobState) {
+ ReportJobRequest reportJobRequest = new ReportJobRequest();
+ reportJobRequest.setJobID(jobId);
+ reportJobRequest.setState(jobState);
+ reportJobRequest.setAddress(IPUtils.getLocalAddress());
+ Metadata metadata = Metadata.newBuilder()
+ .setType(ReportJobRequest.class.getSimpleName())
+ .build();
+ Payload payload = Payload.newBuilder()
+ .setMetadata(metadata)
+
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest))))
+ .build())
+ .build();
+ log.info("report job state request: {}",
JsonUtils.toJSONString(reportJobRequest));
+ requestObserver.onNext(payload);
+ }
+
+ public void stop() {
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ }
+ }
+}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java
new file mode 100644
index 000000000..8bcb72199
--- /dev/null
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.service.verify;
+
+import org.apache.eventmesh.common.enums.ConnectorStage;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.runtime.connector.ConnectorRuntimeConfig;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.grpc.stub.StreamObserver;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.UnsafeByteOperations;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class VerifyService {
+
+ private final ExecutorService reportVerifyExecutor;
+
+ private StreamObserver<Payload> requestObserver;
+
+ private StreamObserver<Payload> responseObserver;
+
+ private AdminServiceGrpc.AdminServiceStub adminServiceStub;
+
+ private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub;
+
+ private ConnectorRuntimeConfig connectorRuntimeConfig;
+
+
+ public VerifyService(AdminServiceGrpc.AdminServiceStub adminServiceStub,
AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub,
+ ConnectorRuntimeConfig connectorRuntimeConfig) {
+ this.adminServiceStub = adminServiceStub;
+ this.adminServiceBlockingStub = adminServiceBlockingStub;
+ this.connectorRuntimeConfig = connectorRuntimeConfig;
+
+ this.reportVerifyExecutor = Executors.newSingleThreadExecutor();
+
+ responseObserver = new StreamObserver<Payload>() {
+ @Override
+ public void onNext(Payload response) {
+ log.debug("verify service receive message: {}|{} ",
response.getMetadata(), response.getBody());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("verify service receive error message: {}",
t.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("verify service finished receive message and
completed");
+ }
+ };
+ requestObserver =
this.adminServiceStub.invokeBiStream(responseObserver);
+ }
+
+ public void reportVerifyRequest(ConnectRecord record, ConnectorStage
connectorStage) {
+ reportVerifyExecutor.submit(() -> {
+ try {
+ byte[] data = (byte[]) record.getData();
+ // use record data + recordUniqueId for md5
+ String md5Str = md5(Arrays.toString(data) +
record.getExtension("recordUniqueId"));
+ ReportVerifyRequest reportVerifyRequest = new
ReportVerifyRequest();
+
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
+
reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID());
+
reportVerifyRequest.setRecordID(record.getExtension("recordUniqueId"));
+ reportVerifyRequest.setRecordSig(md5Str);
+ reportVerifyRequest.setConnectorName(
+ IPUtils.getLocalAddress() + "_" +
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
+ reportVerifyRequest.setConnectorStage(connectorStage.name());
+
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
+
+ Metadata metadata =
Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
+
+ Payload request = Payload.newBuilder().setMetadata(metadata)
+ .setBody(
+
Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
+ .build())
+ .build();
+ requestObserver.onNext(request);
+ } catch (Exception e) {
+ log.error("Failed to report verify request", e);
+ }
+ });
+ }
+
+ private String md5(String input) {
+ try {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ byte[] messageDigest = md.digest(input.getBytes());
+ StringBuilder sb = new StringBuilder();
+ for (byte b : messageDigest) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void stop() {
+ reportVerifyExecutor.shutdown();
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ }
+ }
+
+}
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
index e389357d9..844a9638a 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java
@@ -17,5 +17,18 @@
package org.apache.eventmesh.runtime.util;
+import java.util.Random;
+
public class RuntimeUtils {
+
+ public static String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is
empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]