This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch feat/runtimer-config in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 958b6dffe0f907e06af9cc535ca8bebd8ee24b82 Author: 2011shenlin <[email protected]> AuthorDate: Thu Mar 23 11:12:06 2023 +0800 feat:add watch runner config. --- adapter/runtimer/pom.xml | 49 ++++++++- .../runtimer/common/entity/TargetRunnerConfig.java | 49 ++++++++- .../service/TargetRunnerConfigOnFileObserver.java | 85 +++++++++++---- .../src/main/resources/target-runner.config | 7 -- .../runtimer/src/main/resources/target-runner.json | 23 ++++ .../TargetRunnerConfigOnFileObserverTest.java | 94 +++++++++++++++++ .../runtimer/service/TargetRunnerConfigUtil.java | 116 +++++++++++++++++++++ .../runtimer/service/TestConstants.java | 7 ++ 8 files changed, 396 insertions(+), 34 deletions(-) diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml index 9296b42..49578b4 100644 --- a/adapter/runtimer/pom.xml +++ b/adapter/runtimer/pom.xml @@ -22,6 +22,14 @@ <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId> <version>1.0.0</version> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <commons.io.version>2.8.0</commons.io.version> + <junit.version>4.10</junit.version> + </properties> + <dependencies> <dependency> <groupId>io.openmessaging</groupId> @@ -59,11 +67,44 @@ <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons.io.version}</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-eventbridge-common</artifactId> + <version>1.0.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>RELEASE</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>RELEASE</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>4.0.3</version> + <scope>test</scope> + </dependency> </dependencies> - <properties> - <maven.compiler.source>8</maven.compiler.source> - <maven.compiler.target>8</maven.compiler.target> - </properties> </project> \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java index 45bc5f1..a2eb405 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity; import java.io.Serializable; +import java.util.List; import java.util.Map; import java.util.Objects; import lombok.Data; @@ -33,7 +34,7 @@ public class TargetRunnerConfig implements Serializable { /** * All data are reserved in this map. */ - private Map<String, String> properties; + private List<Map<String, String>> components; @Override public boolean equals(Object o) { @@ -42,12 +43,12 @@ public class TargetRunnerConfig implements Serializable { if (o == null || getClass() != o.getClass()) return false; TargetRunnerConfig config = (TargetRunnerConfig) o; - return Objects.equals(name, config.name) && Objects.equals(properties, config.properties); + return Objects.equals(name, config.name) && isEqualsComponents(components, config.getComponents()); } @Override public int hashCode() { - return Objects.hash(name, properties); + return Objects.hash(name, components); } @Override @@ -55,7 +56,47 @@ public class TargetRunnerConfig implements Serializable { //TODO return "TargetRunnerConfig{" + "connectName='" + name + '\'' + - ", properties=" + properties + + ", properties=" + components + '}'; } + + private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) { + if (source == null || target == null) { + if (source != target) { + return false; + } else { + return true; + } + } + + if (source.isEmpty() || target.isEmpty()) { + if (source.isEmpty() && target.isEmpty()) { + return true; + } else { + return false; + } + } + + if (source.size() != target.size()) { + return false; + } + for (int index = 0; index < source.size(); index++) { + Map<String, String> sourceComponent = source.get(index); + Map<String, String> targetComponent = target.get(index); + if (sourceComponent.size() != targetComponent.size()) { + return false; + } + for (Map.Entry<String, String> entry : sourceComponent.entrySet()) { + String element = targetComponent.get(entry.getKey()); + if (element == null && entry.getValue() == null) { + return true; + } else if (element.equals(entry.getValue())) { + return true; + } else { + return false; + } + } + } + return true; + } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java index 27269b8..b963c72 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java @@ -17,51 +17,78 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.service; +import com.google.common.collect.Maps; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; import java.nio.file.FileSystems; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; -import org.springframework.beans.factory.annotation.Value; +import org.apache.rocketmq.eventbridge.exception.EventBridgeException; @Slf4j public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver { - @Value("${runtimer.storePathRootDir:}") - private String storeRootPath; - - @Value("${runtimer.store.targetRunner.config:targetRunner-config}") - private String fileName; + private String pathName; private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor( ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false)); - public TargetRunnerConfigOnFileObserver(Plugin plugin) { - this.addListen(storeRootPath, fileName, this); + public TargetRunnerConfigOnFileObserver(String pathName) { + this.pathName = pathName; + super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig()); + this.addListen(pathName, this); + } + + public TargetRunnerConfigOnFileObserver() { + super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig()); + this.addListen(pathName, this); } @Override public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() { - return null; + String config = null; + try { + File file = new File(pathName); + config = FileUtils.readFileToString(file, "UTF-8"); + Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() { + }.getType(); + Set<TargetRunnerConfig> taskConfigList = new Gson().fromJson(config, workerConfigType); + return taskConfigList; + } catch (IOException e) { + throw new EventBridgeException("Load component properties failed.", e); + } catch (Throwable e) { + log.error("fail to parse config={} from file={}", config, pathName); + throw e; + } } - public void addListen(String pathStr, String fileName, + public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) { - log.info("Watching task file changing:{}", pathStr + fileName); + log.info("Watching task file changing:{}", pathName); + int index = pathName.lastIndexOf("/"); + String filePath = pathName.substring(0, index); + String fileName = pathName.substring(index + 1); service.scheduleAtFixedRate(() -> { try { WatchService watchService = FileSystems.getDefault() .newWatchService(); - Path path = Paths.get(pathStr); + Path path = Paths.get(filePath); path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY); WatchKey watchKey; @@ -81,12 +108,32 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig } public void diff() { - Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig(); - Set<TargetRunnerConfig> last = super.getTargetRunnerConfig(); - TargetRunnerConfig changed = null; - super.onAddTargetRunner(changed); - super.onUpdateTargetRunner(changed); - super.onDeleteTargetRunner(changed); + Map<String, TargetRunnerConfig> lastMap = toMap(super.getTargetRunnerConfig()); + Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig()); + lastMap.entrySet().forEach(entry -> { + TargetRunnerConfig latest = latestMap.get(entry.getKey()); + if (latest == null) { + super.onDeleteTargetRunner(entry.getValue()); + } else if (!latest.equals(entry.getValue())) { + super.onUpdateTargetRunner(entry.getValue()); + } + }); + + latestMap.entrySet().forEach(entry -> { + TargetRunnerConfig latest = lastMap.get(entry.getKey()); + if (latest == null) { + super.onAddTargetRunner(entry.getValue()); + } + }); + } + + private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) { + if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) { + return Maps.newHashMapWithExpectedSize(0); + } + Map<String, TargetRunnerConfig> map = Maps.newHashMap(); + targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry)); + return map; } } \ No newline at end of file diff --git a/adapter/runtimer/src/main/resources/target-runner.config b/adapter/runtimer/src/main/resources/target-runner.config deleted file mode 100644 index 04458c6..0000000 --- a/adapter/runtimer/src/main/resources/target-runner.config +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - }, - { - - } -] \ No newline at end of file diff --git a/adapter/runtimer/src/main/resources/target-runner.json b/adapter/runtimer/src/main/resources/target-runner.json new file mode 100644 index 0000000..11b5088 --- /dev/null +++ b/adapter/runtimer/src/main/resources/target-runner.json @@ -0,0 +1,23 @@ +[ + { + + "name":"xxxxx", + "components":[ { + "class" : "org.apache.rocketmq.connect.FileStream", + "path" : "xxxxxxxx", + "name" : "demo" + }, + { + "class" : "org.apache.rocketmq.connect.transforms.PatternRename", + "pattern" : "company", + "replacement": "company02" + }, + { + "class" : "org.apache.rocketmq.connect.HttpSinkTask", + "url" : "http://xxxxx/demo" + } ], + "runOptions":{ + "taskSize":1 + } + } +] \ No newline at end of file diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java new file mode 100644 index 0000000..1bb9b3f --- /dev/null +++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java @@ -0,0 +1,94 @@ +package org.apache.rocketmq.eventbridge.runtimer.service; + +import java.time.Duration; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.rocketmq.eventbridge.runtimer.service.TestConstants.DEMO_TARGET_RUNNER_CONFIG_FILE_NAME; +import static org.awaitility.Awaitility.await; + +public class TargetRunnerConfigOnFileObserverTest { + + @Test + public void testGetLatestTargetRunnerConfig() { + TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(getConfigFilePath()); + System.out.println(targetRunnerConfigOnFileObserver.getLatestTargetRunnerConfig()); + Assert.assertTrue(!targetRunnerConfigOnFileObserver.getLatestTargetRunnerConfig().stream().findFirst().get().getComponents().isEmpty()); + } + + @Test + public void testListen_Add() throws InterruptedException { + String path = getConfigFilePath(); + TargetRunnerConfigUtil.resetTargetRunner(path); + + TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(path); + TestTargetRunnerListener targetRunnerListener = new TestTargetRunnerListener(); + targetRunnerConfigOnFileObserver.registerListener(targetRunnerListener); + + Thread.sleep(3000L); + TargetRunnerConfigUtil.addTargetRunner(path); + await().atMost(Duration.ofSeconds(10)).until(() -> { + return targetRunnerListener.addTargetRunner && !targetRunnerListener.updateTargetRunner && !targetRunnerListener.deleteTargetRunner; + }); + } + + @Test + public void testListen_Delete() throws InterruptedException { + String path = getConfigFilePath(); + TargetRunnerConfigUtil.resetTargetRunner(path); + + TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(path); + TestTargetRunnerListener targetRunnerListener = new TestTargetRunnerListener(); + targetRunnerConfigOnFileObserver.registerListener(targetRunnerListener); + + Thread.sleep(3000L); + TargetRunnerConfigUtil.deleteTargetRunner(path); + await().atMost(Duration.ofSeconds(10)).until(() -> { + return !targetRunnerListener.addTargetRunner && !targetRunnerListener.updateTargetRunner && targetRunnerListener.deleteTargetRunner; + }); + } + + @Test + public void testListen_Update() throws InterruptedException { + String path = getConfigFilePath(); + TargetRunnerConfigUtil.resetTargetRunner(path); + + TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(path); + TestTargetRunnerListener targetRunnerListener = new TestTargetRunnerListener(); + targetRunnerConfigOnFileObserver.registerListener(targetRunnerListener); + + Thread.sleep(3000L); + TargetRunnerConfigUtil.updateTargetRunner(path); + await().atMost(Duration.ofSeconds(10)).until(() -> { + return !targetRunnerListener.addTargetRunner && targetRunnerListener.updateTargetRunner && !targetRunnerListener.deleteTargetRunner; + }); + } + + private String getConfigFilePath() { + return this.getClass().getClassLoader().getResource(DEMO_TARGET_RUNNER_CONFIG_FILE_NAME).getPath(); + } + + class TestTargetRunnerListener implements TargetRunnerListener { + boolean addTargetRunner = false; + boolean updateTargetRunner = false; + boolean deleteTargetRunner = false; + + @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) { + System.out.println("watch target runner"); + this.addTargetRunner = true; + } + + @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) { + System.out.println("watch update runner"); + this.updateTargetRunner = true; + } + + @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { + System.out.println("watch delete runner"); + this.deleteTargetRunner = true; + } + } +} \ No newline at end of file diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java new file mode 100644 index 0000000..05804d3 --- /dev/null +++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java @@ -0,0 +1,116 @@ +package org.apache.rocketmq.eventbridge.runtimer.service; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.exception.EventBridgeException; + +import static org.apache.rocketmq.eventbridge.runtimer.service.TestConstants.DEMO_TARGET_RUNNER_NAME; + +public class TargetRunnerConfigUtil { + /** + * add target runner + * + * @param path + */ + + public static void addTargetRunner(String path) { + Set<TargetRunnerConfig> latest = getLatest(path); + latest.add(buildNewTargetRunnerConfig()); + persistenceToFile(path, latest); + } + + /** + * update target runner + * + * @param path + */ + public static void updateTargetRunner(String path) { + Set<TargetRunnerConfig> latest = getLatest(path); + latest.stream().findFirst().get().getComponents().get(0).put("newKey", "newValue"); + persistenceToFile(path, latest); + } + + /** + * update target runner + * + * @param path + */ + public static void deleteTargetRunner(String path) { + persistenceToFile(path, Sets.newHashSet()); + } + + /** + * delete target runner + * + * @param path + */ + public static void resetTargetRunner(String path) { + TargetRunnerConfig targetRunnerConfig = buildNewTargetRunnerConfig(DEMO_TARGET_RUNNER_NAME); + try { + BufferedWriter out = new BufferedWriter(new FileWriter(path)); + out.write(new GsonBuilder().setPrettyPrinting().create().toJson(Lists.newArrayList(targetRunnerConfig))); + out.close(); + } catch (IOException e) { + throw new EventBridgeException("Load component properties failed.", e); + } catch (Throwable e) { + throw e; + } + } + + public static Set<TargetRunnerConfig> getLatest(String path) { + String config = null; + try { + File file = new File(path); + config = FileUtils.readFileToString(file, "UTF-8"); + } catch (Throwable e) { + e.printStackTrace(); + } + Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() { + }.getType(); + return new Gson().fromJson(config, workerConfigType); + } + + private static void persistenceToFile(String path, Set<TargetRunnerConfig> taskConfigList) { + try { + BufferedWriter out = new BufferedWriter(new FileWriter(path)); + out.write(new GsonBuilder().setPrettyPrinting().create().toJson(taskConfigList)); + out.close(); + } catch (IOException e) { + throw new EventBridgeException("Load component properties failed.", e); + } catch (Throwable e) { + throw e; + } + } + + private static TargetRunnerConfig buildNewTargetRunnerConfig(String name) { + TargetRunnerConfig targetRunnerConfig = new TargetRunnerConfig(); + targetRunnerConfig.setName(name); + List<Map<String, String>> components = Lists.newArrayList(); + Map<String, String> component = Maps.newHashMap(); + component.put("K1", UUID.randomUUID().toString()); + components.add(component); + targetRunnerConfig.setComponents(components); + return targetRunnerConfig; + } + + private static TargetRunnerConfig buildNewTargetRunnerConfig() { + return buildNewTargetRunnerConfig(UUID.randomUUID().toString()); + } + +} \ No newline at end of file diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java new file mode 100644 index 0000000..3e8d40b --- /dev/null +++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java @@ -0,0 +1,7 @@ +package org.apache.rocketmq.eventbridge.runtimer.service; + +public class TestConstants { + public static final String DEMO_TARGET_RUNNER_NAME = "demo"; + + public static final String DEMO_TARGET_RUNNER_CONFIG_FILE_NAME = "target-runner.json"; +} \ No newline at end of file
