This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch feat/runtime-manager
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/feat/runtime-manager by this
push:
new 2da769b feat:add test case.
2da769b is described below
commit 2da769bb2a91b0bea5726ba43fbe5620e1498d1c
Author: 2011shenlin <[email protected]>
AuthorDate: Wed Jan 17 16:39:54 2024 +0800
feat:add test case.
---
.../ClusterSelectorService.java} | 14 ++--
.../runtime/manager/cluster/ClusterService.java | 3 +-
.../manager/dispatch/RunnerTaskDispatcher.java | 31 +++++++--
...nnerTaskRebalance.java => WorkerRebalance.java} | 2 +-
.../{ClusterWorkerScale.java => ClusterScale.java} | 4 +-
.../runtime/manager/task/RunnerTaskService.java | 9 +++
.../adapter/runtime/manager/watch/WatchWorker.java | 3 +-
.../WorkerLoadService.java} | 11 ++--
.../WorkerSelectorService.java} | 26 +++++---
.../WorkerInstanceRepositoryOnK8STest.java | 73 +++++++++++++++++++++
rocketmq_eventbridge.mv.db | Bin 81920 -> 77824 bytes
11 files changed, 147 insertions(+), 29 deletions(-)
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java
similarity index 73%
copy from
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
copy to
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java
index 67254db..6ba2766 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java
@@ -15,14 +15,18 @@
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster;
-import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask;
+import org.springframework.stereotype.Service;
-public class RunnerTaskService {
+@Service
+public class ClusterSelectorService {
+ public Cluster selectCluster(RunnerTask runnerTask) {
+ return selectDefaultCluster();
+ }
- List<RunnerTask> listRunnerTask(String runnerName) {
+ public Cluster selectDefaultCluster() {
return null;
}
-
}
\ No newline at end of file
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
index 7ac59ca..ef27461 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
@@ -38,10 +38,11 @@ public class ClusterService {
return true;
}
- public Cluster getCluster(String clusterName) {
+ public Cluster getCluster(long clusterId) {
return null;
}
+
public List<Cluster> listCluster() {
return clusterRepository.listCluster();
}
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
index 605d38f..696265b 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
@@ -17,16 +17,39 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch;
+import javax.annotation.Resource;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.ClusterSelectorService;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTaskService;
import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerSelectorService;
public class RunnerTaskDispatcher {
- public boolean dispatchToCluster(Worker worker) {
- return false;
+ @Resource
+ ClusterSelectorService clusterSelectorService;
+
+ @Resource
+ WorkerSelectorService workerSelectorService;
+
+ @Resource
+ RunnerTaskService runnerTaskService;
+
+ public boolean dispatchRunnerTask(RunnerTask runnerTask) {
+ boolean dispatchToCluster = dispatchToCluster(runnerTask);
+ boolean dispatchToWorker = dispatchToWorker(runnerTask);
+ return dispatchToCluster && dispatchToWorker;
+ }
+
+ public boolean dispatchToCluster(RunnerTask runnerTask) {
+ Cluster cluster = clusterSelectorService.selectCluster(runnerTask);
+ return runnerTaskService.updateRunnerTaskCluster(runnerTask, cluster);
}
- public boolean dispatchToWorker(Worker worker) {
- return false;
+ public boolean dispatchToWorker(RunnerTask runnerTask) {
+ Worker worker = workerSelectorService.selectWorker(runnerTask);
+ return runnerTaskService.updateRunnerTaskWorker(runnerTask, worker);
}
}
\ No newline at end of file
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java
similarity index 96%
rename from
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java
rename to
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java
index 5abdb47..052bb82 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java
@@ -19,7 +19,7 @@ package
org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch;
import org.springframework.beans.factory.annotation.Autowired;
-public class RunnerTaskRebalance {
+public class WorkerRebalance {
@Autowired
private RunnerTaskDispatcher runnerTaskDispatcher;
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java
similarity index 95%
rename from
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java
rename to
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java
index be22fa0..866695b 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java
@@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
-public class ClusterWorkerScale {
+public class ClusterScale {
@Autowired
ClusterService clusterService;
@@ -37,7 +37,7 @@ public class ClusterWorkerScale {
private int DEFAULT_SCALE_DOWN_TRIGGER_LOAD = 20;
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryImpl(ClusterWorkerScale.class.getSimpleName()));
+ new ThreadFactoryImpl(ClusterScale.class.getSimpleName()));
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
index 67254db..90df069 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task;
import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
public class RunnerTaskService {
@@ -25,4 +27,11 @@ public class RunnerTaskService {
return null;
}
+ public boolean updateRunnerTaskWorker(RunnerTask task, Worker worker) {
+ return true;
+ }
+
+ public boolean updateRunnerTaskCluster(RunnerTask task, Cluster cluster) {
+ return true;
+ }
}
\ No newline at end of file
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
index c00a694..b507128 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
@@ -57,9 +57,8 @@ public class WatchWorker {
if (!workerService.isFinalState(worker)) {
Map<String, Object> environments = new
Gson().fromJson(worker.getConfig(), new TypeToken<Map<String, Object>>() {
}.getType());
-// WorkerResource workerResource = new
Gson().fromJson(worker.getResources(), WorkerResource.class);
log.info("applyWorkerInstance, workerName: {},
workerImageTag: {}, workerResource: {}, environments: {}", worker.getName(),
worker.getImage(), worker.getResources(), new Gson().toJson(environments));
-//
workerInstanceRepository.applyWorkerInstance(worker.getName(),
worker.getImageTag(), workerResource, environments);
+//
workerInstanceRepository.applyWorkerInstance(worker.getName(),
worker.getImage(), new Gson().fromJson(worker.getResources(),
WorkerResource.class), environments);
workerService.refreshMD5(worker);
}
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java
similarity index 79%
copy from
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
copy to
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java
index 67254db..47f8e62 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
-import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import org.springframework.stereotype.Service;
-public class RunnerTaskService {
+@Service
+public class WorkerLoadService {
- List<RunnerTask> listRunnerTask(String runnerName) {
+ public Worker getMinLoadWorker(Cluster cluster) {
return null;
}
-
}
\ No newline at end of file
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java
similarity index 55%
copy from
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
copy to
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java
index 605d38f..abba9d1 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java
@@ -15,18 +15,26 @@
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
-import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
+import javax.annotation.Resource;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.ClusterService;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask;
+import org.springframework.stereotype.Service;
-public class RunnerTaskDispatcher {
+@Service
+public class WorkerSelectorService {
- public boolean dispatchToCluster(Worker worker) {
- return false;
- }
+ @Resource
+ WorkerLoadService workerLoadService;
- public boolean dispatchToWorker(Worker worker) {
- return false;
- }
+ @Resource
+ ClusterService clusterService;
+ public Worker selectWorker(RunnerTask runnerTask) {
+ Cluster cluster = clusterService.getCluster(runnerTask.getClusterId());
+ Worker worker = workerLoadService.getMinLoadWorker(cluster);
+ return worker;
+ }
}
\ No newline at end of file
diff --git
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
new file mode 100644
index 0000000..d96438b
--- /dev/null
+++
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
@@ -0,0 +1,73 @@
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import java.util.Map;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource;
+import org.junit.Ignore;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+
+
+public class WorkerInstanceRepositoryOnK8STest {
+
+ @InjectMocks
+ private WorkerInstanceRepositoryOnK8S workerInstanceRepositoryOnK8S;
+
+ @Test
+ @Ignore
+ void applyWorkerInstance() {
+ Map<String, Object> environments = Maps.newHashMap();
+ environments.put("key1", "value1");
+ environments.put("key2", "value2");
+ workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4",
"registry.cn-beijing.cr.aliyuncs.com/eventbridge:20231115195431f55971", new
Gson().fromJson("{\"cpu\":1,\"memory\":1}", WorkerResource.class), null);
+ }
+
+ @Test
+ @Ignore
+ void deleteWorkerInstance() {
+ workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4");
+ }
+
+ @Test
+ @Ignore
+ void getWorkerInstanceStatus() {
+ workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4");
+ }
+
+ @Test
+ @Ignore
+ void applyWorkerInstanceConfigFile() {
+ String taskConfig = "[\n" +
+ " {\n" +
+ " \"name\":\"demo-runner\",\n" +
+ " \"components\":[\n" +
+ " {\n" +
+ " \"accountId\": \"654321\",\n" +
+ " \"eventBusName\":\"demo-bus\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"filterPattern\":\"{}\",\n" +
+ "
\"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n"
+
+ " },\n" +
+ " {\n" +
+ "
\"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n"
+
+ " \"class\":
\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" +
+ " },\n" +
+ " {\n" +
+ "
\"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" +
+ " \"webHook\":\"xxxxxxxxxxx\",\n" +
+ " \"secretKey\":\"xxxxxxxxxxx\"\n" +
+ " }\n" +
+ " ]\n" +
+ " }\n" +
+ "]";
+
workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-4",
"/eventbridge/task-config", taskConfig);
+ }
+
+ @Test
+ @Ignore
+ void getWorkerInstanceConfigFile() {
+ workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4",
"/eventbridge/task-config");
+ }
+}
\ No newline at end of file
diff --git a/rocketmq_eventbridge.mv.db b/rocketmq_eventbridge.mv.db
index 1a098df..2e5c25d 100644
Binary files a/rocketmq_eventbridge.mv.db and b/rocketmq_eventbridge.mv.db
differ