This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 4104d9b  RestHandlerTest add unit test (#260)
4104d9b is described below

commit 4104d9b4c3ef4205d409a0107d0080a14e5cbe42
Author: Oliver <[email protected]>
AuthorDate: Mon Aug 29 13:50:12 2022 +0800

    RestHandlerTest add unit test (#260)
---
 .../connect/runtime/rest/entities/PluginInfo.java  |  9 +++
 .../connect/runtime/rest/RestHandlerTest.java      | 82 +++++++++++++++++++++-
 2 files changed, 90 insertions(+), 1 deletion(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
index 007b97c..05e832d 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
@@ -75,4 +75,13 @@ public class PluginInfo {
     public int hashCode() {
         return Objects.hash(className, type, version);
     }
+
+    @Override
+    public String toString() {
+        return "PluginInfo{" +
+            "className='" + className + '\'' +
+            ", type=" + type +
+            ", version='" + version + '\'' +
+            '}';
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 85bd3ac..f94ed14 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
@@ -51,6 +52,7 @@ import 
org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState;
 import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo;
 import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
@@ -59,6 +61,7 @@ import 
org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -123,6 +126,18 @@ public class RestHandlerTest {
 
     private static final String GET_ALLOCATED_CONNECTORS_URL = 
"http://localhost:8081/getAllocatedConnectors";;
 
+    private static final String GET_ALLOCATED_TASKS_URL = 
"http://localhost:8081/getAllocatedTasks";;
+
+    private static final String QUERY_CONNECTOR_CONFIG_URL = 
"http://localhost:8081/connectors/testConnector/config";;
+
+    private static final String QUERY_CONNECTOR_STATUS_URL = 
"http://localhost:8081/connectors/testConnector/status";;
+
+    private static final String STOP_ALL_CONNECTOR_URL = 
"http://localhost:8081/connectors/stopAll";;
+
+    private static final String PLUGIN_LIST_URL = 
"http://localhost:8081/plugin/list";;
+
+    private static final String PLUGIN_RELOAD_URL = 
"http://localhost:8081/plugin/reload";;
+
     private HttpClient httpClient;
 
     private List<String> aliveWorker;
@@ -228,8 +243,12 @@ public class RestHandlerTest {
         };
         when(connectController.getWorker()).thenReturn(worker);
         when(worker.getWorkingConnectors()).thenReturn(workerConnectors);
-        when(worker.getWorkingTasks()).thenReturn(workerTasks);
 
+        List<String> pluginPaths = new ArrayList<>();
+        pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
+        Plugin plugin = new Plugin(pluginPaths);
+        when(connectController.plugin()).thenReturn(plugin);
+        when(configManagementService.getPlugin()).thenReturn(plugin);
         restHandler = new RestHandler(connectController);
 
         httpClient = HttpClientBuilder.create().build();
@@ -279,6 +298,67 @@ public class RestHandlerTest {
             connectors.put(workerConnector.getConnectorName(), 
workerConnector.getKeyValue());
         }
         assertEquals(JSON.toJSONString(connectors), 
EntityUtils.toString(httpResponse4.getEntity(), "UTF-8"));
+
+        URIBuilder uriBuilder5 = new URIBuilder(GET_ALLOCATED_TASKS_URL);
+        URI uri5 = uriBuilder5.build();
+        HttpGet httpGet5 = new HttpGet(uri5);
+        HttpResponse httpResponse5 = httpClient.execute(httpGet5);
+        assertEquals(200, httpResponse5.getStatusLine().getStatusCode());
+        Map<String, Object> formatter2 = new HashMap<>();
+        formatter2.put("pendingTasks", new HashSet<>());
+        formatter2.put("runningTasks",  new HashSet<>());
+        formatter2.put("stoppingTasks",  new HashSet<>());
+        formatter2.put("stoppedTasks",  new HashSet<>());
+        formatter2.put("errorTasks",  new HashSet<>());
+        assertEquals(JSON.toJSONString(formatter2), 
EntityUtils.toString(httpResponse5.getEntity(), "UTF-8"));
+
+        URIBuilder uriBuilder6 = new URIBuilder(QUERY_CONNECTOR_CONFIG_URL);
+        URI uri6 = uriBuilder6.build();
+        HttpGet httpGet6 = new HttpGet(uri6);
+        HttpResponse httpResponse6 = httpClient.execute(httpGet6);
+        assertEquals(200, httpResponse6.getStatusLine().getStatusCode());
+        String connectorName = "testConnector";
+        Map<String, ConnectKeyValue> connectorConfigs = 
connectController.getConfigManagementService().getConnectorConfigs();
+        Map<String, List<ConnectKeyValue>> taskConfigs = 
connectController.getConfigManagementService().getTaskConfigs();
+        StringBuilder sb = new StringBuilder();
+        sb.append("ConnectorConfigs:")
+            .append(JSON.toJSONString(connectorConfigs.get(connectorName)))
+            .append("\n")
+            .append("TaskConfigs:")
+            .append(JSON.toJSONString(taskConfigs.get(connectorName)));
+        assertEquals(sb.toString(), 
EntityUtils.toString(httpResponse6.getEntity(), "UTF-8"));
+
+        URIBuilder uriBuilder7 = new URIBuilder(QUERY_CONNECTOR_STATUS_URL);
+        URI uri7 = uriBuilder7.build();
+        HttpGet httpGet7 = new HttpGet(uri7);
+        HttpResponse httpResponse7 = httpClient.execute(httpGet7);
+        assertEquals(200, httpResponse7.getStatusLine().getStatusCode());
+        assertEquals("running", 
EntityUtils.toString(httpResponse7.getEntity(), "UTF-8"));
+
+        URIBuilder uriBuilder8 = new URIBuilder(STOP_ALL_CONNECTOR_URL);
+        URI uri8 = uriBuilder8.build();
+        HttpGet httpGet8 = new HttpGet(uri8);
+        HttpResponse httpResponse8 = httpClient.execute(httpGet8);
+        assertEquals(200, httpResponse8.getStatusLine().getStatusCode());
+        assertEquals("success", 
EntityUtils.toString(httpResponse8.getEntity(), "UTF-8"));
+
+        URIBuilder uriBuilder9 = new URIBuilder(PLUGIN_LIST_URL);
+        URI uri9 = uriBuilder9.build();
+        HttpGet httpGet9 = new HttpGet(uri9);
+        HttpResponse httpResponse9 = httpClient.execute(httpGet9);
+        assertEquals(200, httpResponse9.getStatusLine().getStatusCode());
+        List<PluginInfo> connectorPlugins = 
JSON.parseArray(EntityUtils.toString(httpResponse9.getEntity(), "UTF-8"), 
PluginInfo.class);
+        final Map<String, PluginInfo> connectorPluginMap = 
connectorPlugins.stream().collect(Collectors.toMap(PluginInfo::getClassName, 
item -> item, (k1, k2) -> k1));
+        
Assert.assertTrue(connectorPluginMap.containsKey("org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform"));
+        
Assert.assertTrue(connectorPluginMap.containsKey("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter"));
+
+        URIBuilder uriBuilder10 = new URIBuilder(PLUGIN_RELOAD_URL);
+        URI uri10 = uriBuilder10.build();
+        HttpGet httpGet10 = new HttpGet(uri10);
+        HttpResponse httpResponse10 = httpClient.execute(httpGet10);
+        assertEquals(200, httpResponse10.getStatusLine().getStatusCode());
+        assertEquals("success", 
EntityUtils.toString(httpResponse10.getEntity(), "UTF-8"));
+
     }
 
 }
\ No newline at end of file

Reply via email to