Dreaouth commented on a change in pull request #623:
URL: https://github.com/apache/rocketmq-externals/pull/623#discussion_r472293533
##########
File path:
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
##########
@@ -49,24 +49,48 @@
public RestHandler(ConnectController connectController) {
this.connectController = connectController;
Javalin app =
Javalin.start(connectController.getConnectConfig().getHttpPort());
- app.get("/connectors/stopAll", this::handleStopAllConnector);
- app.get("/connectors/pauseAll", this::handlePauseAllConnector);
- app.get("/connectors/resumeAll", this::handleResumeAllConnector);
- app.get("/connectors/enableAll", this::handleEnableAllConnector);
- app.get("/connectors/disableAll", this::handleDisableAllConnector);
- app.get("/connectors/:connectorName", this::handleCreateConnector);
- app.get("/connectors/:connectorName/config",
this::handleQueryConnectorConfig);
- app.get("/connectors/:connectorName/status",
this::handleQueryConnectorStatus);
- app.get("/connectors/:connectorName/stop", this::handleStopConnector);
- app.get("/connectors/:connectorName/pause",
this::handlePauseConnector);
- app.get("/connectors/:connectorName/resume",
this::handleResumeConnector);
- app.get("/connectors/:connectorName/enable",
this::handleEnableConnector);
- app.get("/connectors/:connectorName/disable",
this::handleDisableConnector);
- app.get("/getClusterInfo", this::getClusterInfo);
- app.get("/getConfigInfo", this::getConfigInfo);
- app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
- app.get("/getAllocatedTasks", this::getAllocatedTasks);
- app.get("/plugin/reload", this::reloadPlugins);
+ if (this.connectController.getConnectConfig().getIsLeader() == 1) {
+ app.get("/connectors/stopAll", this::handleStopAllConnector);
+ app.get("/connectors/pauseAll", this::handlePauseAllConnector);
+ app.get("/connectors/resumeAll", this::handleResumeAllConnector);
+ app.get("/connectors/enableAll", this::handleEnableAllConnector);
+ app.get("/connectors/disableAll", this::handleDisableAllConnector);
+ app.get("/connectors/:connectorName", this::handleCreateConnector);
+ app.get("/connectors/:connectorName/config",
this::handleQueryConnectorConfig);
+ app.get("/connectors/:connectorName/status",
this::handleQueryConnectorStatus);
+ app.get("/connectors/:connectorName/stop",
this::handleStopConnector);
+ app.get("/connectors/:connectorName/pause",
this::handlePauseConnector);
+ app.get("/connectors/:connectorName/resume",
this::handleResumeConnector);
+ app.get("/connectors/:connectorName/enable",
this::handleEnableConnector);
+ app.get("/connectors/:connectorName/disable",
this::handleDisableConnector);
+ app.get("/getClusterInfo", this::getClusterInfo);
+ app.get("/getConfigInfo", this::getConfigInfo);
+ app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
+ app.get("/getAllocatedTasks", this::getAllocatedTasks);
+ app.get("/plugin/reload", this::reloadPlugins);
+ }
+ else {
+ app.get("/connectors/:connectorName/config",
this::handleQueryConnectorConfig);
+ app.get("/connectors/:connectorName/status",
this::handleQueryConnectorStatus);
+ app.get("/getClusterInfo", this::getClusterInfo);
+ app.get("/getConfigInfo", this::getConfigInfo);
+ app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
+ app.get("/getAllocatedTasks", this::getAllocatedTasks);
+ app.get("/plugin/reload", this::reloadPlugins);
+ app.get("/*", this::RedirectionToLeader);
+ }
+ }
+
+ /**
+ * Redirect to the Leader
+ *
+ * @param context
+ */
+ private void RedirectionToLeader(Context context) {
+ String address =
this.connectController.getConnectConfig().getLeaderID().replace(":", "@");
+ String parm = context.queryString() == null ? "" : "?" +
context.queryString();
Review comment:
Updated. Use gRPC instead
##########
File path:
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
##########
@@ -107,10 +125,53 @@ public void stop() {
taskKeyValueStore.persist();
}
+
+ /**
+ * Check if Cluster has leader
+ *
+ * @return workerID if Cluster has leader or this worker is leader
+ */
+ public void checkLeaderState() throws ConnectException {
+ if (connectConfig.getIsLeader() == 1) {
+ log.info("This worker is a leader, leaderID is " +
connectConfig.getWorkerID());
+ connectConfig.setLeaderID(connectConfig.getWorkerID());
+ sendOnlineConfig();
+ }
+ else {
+ if (connectConfig.getLeaderID() != null) {
+ log.info("This worker is a follower, leader is " +
connectConfig.getLeaderID());
+ connectConfig.setLeaderID(connectConfig.getLeaderID());
+ }
+ else
Review comment:
Updated all
##########
File path:
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
##########
@@ -107,10 +125,53 @@ public void stop() {
taskKeyValueStore.persist();
}
+
+ /**
+ * Check if Cluster has leader
+ *
+ * @return workerID if Cluster has leader or this worker is leader
+ */
+ public void checkLeaderState() throws ConnectException {
+ if (connectConfig.getIsLeader() == 1) {
+ log.info("This worker is a leader, leaderID is " +
connectConfig.getWorkerID());
+ connectConfig.setLeaderID(connectConfig.getWorkerID());
+ sendOnlineConfig();
+ }
+ else {
+ if (connectConfig.getLeaderID() != null) {
+ log.info("This worker is a follower, leader is " +
connectConfig.getLeaderID());
+ connectConfig.setLeaderID(connectConfig.getLeaderID());
+ }
+ else
+ throw new ConnectException("leader status error");
+ }
+ }
+
+
+ /**
+ * verify the leader in topic
+ *
+ * @return
+ */
+ public boolean checkLeaderState(String leader) {
+ if (leader.equals("")) {
+ log.error("Receive an CONFIG_CHANG_KEY without a leader");
Review comment:
Updated
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]