This is an automated email from the ASF dual-hosted git repository. mikexue pushed a commit to branch eventmesh-function in repository https://gitbox.apache.org/repos/asf/eventmesh.git
commit 62e8612de0f791a5ca83805dc723e9c7ecf2c5d0 Author: xwm1992 <[email protected]> AuthorDate: Fri Jun 28 17:24:03 2024 +0800 fix checkStyle error --- .../position/impl/MysqlPositionHandler.java | 2 +- .../connector/canal/dialect/MysqlDialect.java | 4 -- .../eventmesh/runtime/boot/RuntimeInstance.java | 50 +++++++++++----------- 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index d0d32a5c6..525fe02c0 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -152,9 +152,9 @@ public class MysqlPositionHandler extends PositionHandler { CanalRecordPartition partition = new CanalRecordPartition(); partition.setTimeStamp(position.getTimestamp()); partition.setJournalName(position.getJournalName()); + recordPosition.setRecordPartition(partition); CanalRecordOffset offset = new CanalRecordOffset(); offset.setOffset(position.getPosition()); - recordPosition.setRecordPartition(partition); recordPosition.setRecordOffset(offset); recordPositionList.add(recordPosition); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index 1a47a0521..acd491ba6 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -19,10 +19,6 @@ package org.apache.eventmesh.connector.canal.dialect; import org.apache.eventmesh.connector.canal.template.MysqlSqlTemplate; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.support.lob.LobHandler; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java index 56b3a5967..0fade897f 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java @@ -45,7 +45,7 @@ public class RuntimeInstance { private Map<String, RegisterServerInfo> adminServerInfoMap = new HashMap<>(); -// private final RegistryService registryService; + private final RegistryService registryService; private Runtime runtime; @@ -57,20 +57,20 @@ public class RuntimeInstance { public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; -// this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); + this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); } public void init() throws Exception { -// registryService.init(); -// QueryInstances queryInstances = new QueryInstances(); -// queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); -// queryInstances.setHealth(true); -// List<RegisterServerInfo> adminServerRegisterInfoList = registryService.selectInstances(queryInstances); -// if (!adminServerRegisterInfoList.isEmpty()) { -// adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); -// } else { -// throw new RuntimeException("admin server address is empty, please check"); -// } + registryService.init(); + QueryInstances queryInstances = new QueryInstances(); + queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); + queryInstances.setHealth(true); + List<RegisterServerInfo> adminServerRegisterInfoList = registryService.selectInstances(queryInstances); + if (!adminServerRegisterInfoList.isEmpty()) { + adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); + } else { + throw new RuntimeException("admin server address is empty, please check"); + } runtimeInstanceConfig.setAdminServerAddr(adminServerAddr); runtimeFactory = initRuntimeFactory(runtimeInstanceConfig); runtime = runtimeFactory.createRuntime(runtimeInstanceConfig); @@ -80,19 +80,19 @@ public class RuntimeInstance { public void start() throws Exception { if (!StringUtils.isBlank(adminServerAddr)) { -// registryService.subscribe((event) -> { -// log.info("runtime receive registry event: {}", event); -// List<RegisterServerInfo> registerServerInfoList = event.getInstances(); -// Map<String, RegisterServerInfo> registerServerInfoMap = new HashMap<>(); -// for (RegisterServerInfo registerServerInfo : registerServerInfoList) { -// registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); -// } -// if (!registerServerInfoMap.isEmpty()) { -// adminServerInfoMap = registerServerInfoMap; -// updateAdminServerAddr(); -// } -// -// }, runtimeInstanceConfig.getAdminServiceName()); + registryService.subscribe((event) -> { + log.info("runtime receive registry event: {}", event); + List<RegisterServerInfo> registerServerInfoList = event.getInstances(); + Map<String, RegisterServerInfo> registerServerInfoMap = new HashMap<>(); + for (RegisterServerInfo registerServerInfo : registerServerInfoList) { + registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); + } + if (!registerServerInfoMap.isEmpty()) { + adminServerInfoMap = registerServerInfoMap; + updateAdminServerAddr(); + } + + }, runtimeInstanceConfig.getAdminServiceName()); runtime.start(); isStarted = true; } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
