This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 4104d9b RestHandlerTest add unit test (#260)
4104d9b is described below
commit 4104d9b4c3ef4205d409a0107d0080a14e5cbe42
Author: Oliver <[email protected]>
AuthorDate: Mon Aug 29 13:50:12 2022 +0800
RestHandlerTest add unit test (#260)
---
.../connect/runtime/rest/entities/PluginInfo.java | 9 +++
.../connect/runtime/rest/RestHandlerTest.java | 82 +++++++++++++++++++++-
2 files changed, 90 insertions(+), 1 deletion(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
index 007b97c..05e832d 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
@@ -75,4 +75,13 @@ public class PluginInfo {
public int hashCode() {
return Objects.hash(className, type, version);
}
+
+ @Override
+ public String toString() {
+ return "PluginInfo{" +
+ "className='" + className + '\'' +
+ ", type=" + type +
+ ", version='" + version + '\'' +
+ '}';
+ }
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 85bd3ac..f94ed14 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
@@ -51,6 +52,7 @@ import
org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo;
import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
@@ -59,6 +61,7 @@ import
org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -123,6 +126,18 @@ public class RestHandlerTest {
private static final String GET_ALLOCATED_CONNECTORS_URL =
"http://localhost:8081/getAllocatedConnectors";
+ private static final String GET_ALLOCATED_TASKS_URL =
"http://localhost:8081/getAllocatedTasks";
+
+ private static final String QUERY_CONNECTOR_CONFIG_URL =
"http://localhost:8081/connectors/testConnector/config";
+
+ private static final String QUERY_CONNECTOR_STATUS_URL =
"http://localhost:8081/connectors/testConnector/status";
+
+ private static final String STOP_ALL_CONNECTOR_URL =
"http://localhost:8081/connectors/stopAll";
+
+ private static final String PLUGIN_LIST_URL =
"http://localhost:8081/plugin/list";
+
+ private static final String PLUGIN_RELOAD_URL =
"http://localhost:8081/plugin/reload";
+
private HttpClient httpClient;
private List<String> aliveWorker;
@@ -228,8 +243,12 @@ public class RestHandlerTest {
};
when(connectController.getWorker()).thenReturn(worker);
when(worker.getWorkingConnectors()).thenReturn(workerConnectors);
- when(worker.getWorkingTasks()).thenReturn(workerTasks);
+ List<String> pluginPaths = new ArrayList<>();
+ pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
+ Plugin plugin = new Plugin(pluginPaths);
+ when(connectController.plugin()).thenReturn(plugin);
+ when(configManagementService.getPlugin()).thenReturn(plugin);
restHandler = new RestHandler(connectController);
httpClient = HttpClientBuilder.create().build();
@@ -279,6 +298,67 @@ public class RestHandlerTest {
connectors.put(workerConnector.getConnectorName(),
workerConnector.getKeyValue());
}
assertEquals(JSON.toJSONString(connectors),
EntityUtils.toString(httpResponse4.getEntity(), "UTF-8"));
+
+ URIBuilder uriBuilder5 = new URIBuilder(GET_ALLOCATED_TASKS_URL);
+ URI uri5 = uriBuilder5.build();
+ HttpGet httpGet5 = new HttpGet(uri5);
+ HttpResponse httpResponse5 = httpClient.execute(httpGet5);
+ assertEquals(200, httpResponse5.getStatusLine().getStatusCode());
+ Map<String, Object> formatter2 = new HashMap<>();
+ formatter2.put("pendingTasks", new HashSet<>());
+ formatter2.put("runningTasks", new HashSet<>());
+ formatter2.put("stoppingTasks", new HashSet<>());
+ formatter2.put("stoppedTasks", new HashSet<>());
+ formatter2.put("errorTasks", new HashSet<>());
+ assertEquals(JSON.toJSONString(formatter2),
EntityUtils.toString(httpResponse5.getEntity(), "UTF-8"));
+
+ URIBuilder uriBuilder6 = new URIBuilder(QUERY_CONNECTOR_CONFIG_URL);
+ URI uri6 = uriBuilder6.build();
+ HttpGet httpGet6 = new HttpGet(uri6);
+ HttpResponse httpResponse6 = httpClient.execute(httpGet6);
+ assertEquals(200, httpResponse6.getStatusLine().getStatusCode());
+ String connectorName = "testConnector";
+ Map<String, ConnectKeyValue> connectorConfigs =
connectController.getConfigManagementService().getConnectorConfigs();
+ Map<String, List<ConnectKeyValue>> taskConfigs =
connectController.getConfigManagementService().getTaskConfigs();
+ StringBuilder sb = new StringBuilder();
+ sb.append("ConnectorConfigs:")
+ .append(JSON.toJSONString(connectorConfigs.get(connectorName)))
+ .append("\n")
+ .append("TaskConfigs:")
+ .append(JSON.toJSONString(taskConfigs.get(connectorName)));
+ assertEquals(sb.toString(),
EntityUtils.toString(httpResponse6.getEntity(), "UTF-8"));
+
+ URIBuilder uriBuilder7 = new URIBuilder(QUERY_CONNECTOR_STATUS_URL);
+ URI uri7 = uriBuilder7.build();
+ HttpGet httpGet7 = new HttpGet(uri7);
+ HttpResponse httpResponse7 = httpClient.execute(httpGet7);
+ assertEquals(200, httpResponse7.getStatusLine().getStatusCode());
+ assertEquals("running",
EntityUtils.toString(httpResponse7.getEntity(), "UTF-8"));
+
+ URIBuilder uriBuilder8 = new URIBuilder(STOP_ALL_CONNECTOR_URL);
+ URI uri8 = uriBuilder8.build();
+ HttpGet httpGet8 = new HttpGet(uri8);
+ HttpResponse httpResponse8 = httpClient.execute(httpGet8);
+ assertEquals(200, httpResponse8.getStatusLine().getStatusCode());
+ assertEquals("success",
EntityUtils.toString(httpResponse8.getEntity(), "UTF-8"));
+
+ URIBuilder uriBuilder9 = new URIBuilder(PLUGIN_LIST_URL);
+ URI uri9 = uriBuilder9.build();
+ HttpGet httpGet9 = new HttpGet(uri9);
+ HttpResponse httpResponse9 = httpClient.execute(httpGet9);
+ assertEquals(200, httpResponse9.getStatusLine().getStatusCode());
+ List<PluginInfo> connectorPlugins =
JSON.parseArray(EntityUtils.toString(httpResponse9.getEntity(), "UTF-8"),
PluginInfo.class);
+ final Map<String, PluginInfo> connectorPluginMap =
connectorPlugins.stream().collect(Collectors.toMap(PluginInfo::getClassName,
item -> item, (k1, k2) -> k1));
+
Assert.assertTrue(connectorPluginMap.containsKey("org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform"));
+
Assert.assertTrue(connectorPluginMap.containsKey("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter"));
+
+ URIBuilder uriBuilder10 = new URIBuilder(PLUGIN_RELOAD_URL);
+ URI uri10 = uriBuilder10.build();
+ HttpGet httpGet10 = new HttpGet(uri10);
+ HttpResponse httpResponse10 = httpClient.execute(httpGet10);
+ assertEquals(200, httpResponse10.getStatusLine().getStatusCode());
+ assertEquals("success",
EntityUtils.toString(httpResponse10.getEntity(), "UTF-8"));
+
}
}
\ No newline at end of file