imaffe commented on a change in pull request #623:
URL: https://github.com/apache/rocketmq-externals/pull/623#discussion_r471296688



##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
##########
@@ -107,10 +125,53 @@ public void stop() {
         taskKeyValueStore.persist();
     }
 
+
+    /**
+     * Check if Cluster has leader
+     *
+     * @return workerID if Cluster has leader or this worker is leader
+     */
+    public void checkLeaderState() throws ConnectException {
+        if (connectConfig.getIsLeader() == 1) {
+            log.info("This worker is a leader, leaderID is " + 
connectConfig.getWorkerID());
+            connectConfig.setLeaderID(connectConfig.getWorkerID());
+            sendOnlineConfig();
+        }
+        else {
+            if (connectConfig.getLeaderID() != null) {
+                log.info("This worker is a follower, leader is " + 
connectConfig.getLeaderID());
+                connectConfig.setLeaderID(connectConfig.getLeaderID());
+            }
+            else

Review comment:
       we always use {} even it has only one line

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
##########
@@ -169,6 +230,7 @@ public String putConnectorConfig(String connectorName, 
ConnectKeyValue configs)
         if (errorMessage != null && errorMessage.length() > 0) {
             return errorMessage;
         }
+        // TODO is this the problem ? Put is executed after remove ?

Review comment:
       Some TODO are left in code, try to reduce them as much ap

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
##########
@@ -111,6 +121,46 @@ public void setNamesrvAddr(String namesrvAddr) {
         this.namesrvAddr = namesrvAddr;
     }
 
+    public int getIsLeader() {
+        return isLeader;
+    }
+
+    public void setIsLeader(int isLeader) {

Review comment:
       Instead of using isLeader, perhaps using a single variable workerRole 
(or any name you prefer) might be better, and role == 1 means leader, role == 0 
means slave, and define these mapping using `public static final int `

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
##########
@@ -107,10 +125,53 @@ public void stop() {
         taskKeyValueStore.persist();
     }
 
+
+    /**
+     * Check if Cluster has leader
+     *
+     * @return workerID if Cluster has leader or this worker is leader
+     */
+    public void checkLeaderState() throws ConnectException {
+        if (connectConfig.getIsLeader() == 1) {
+            log.info("This worker is a leader, leaderID is " + 
connectConfig.getWorkerID());
+            connectConfig.setLeaderID(connectConfig.getWorkerID());
+            sendOnlineConfig();
+        }
+        else {
+            if (connectConfig.getLeaderID() != null) {
+                log.info("This worker is a follower, leader is " + 
connectConfig.getLeaderID());
+                connectConfig.setLeaderID(connectConfig.getLeaderID());
+            }
+            else
+                throw new ConnectException("leader status error");
+        }
+    }
+
+
+    /**
+     * verify the leader in topic
+     *
+     * @return
+     */
+    public boolean checkLeaderState(String leader) {
+        if (leader.equals("")) {
+            log.error("Receive an CONFIG_CHANG_KEY without a leader");

Review comment:
       CONFIG_CHANGE_KEY

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
##########
@@ -57,6 +57,13 @@
      */
     void registerListener(WorkerStatusListener listener);
 
+    /**
+     * Register a leader status listener to listen the change of leader status.
+     *
+     * @param listener
+     */
+    void registerListener(LeaderStatusListener listener);

Review comment:
       Can we refactor these two method so they have different name ? Using 
parameter type to distinguish between 2 method might not be a good idea

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
##########
@@ -49,24 +49,48 @@
     public RestHandler(ConnectController connectController) {
         this.connectController = connectController;
         Javalin app = 
Javalin.start(connectController.getConnectConfig().getHttpPort());
-        app.get("/connectors/stopAll", this::handleStopAllConnector);
-        app.get("/connectors/pauseAll", this::handlePauseAllConnector);
-        app.get("/connectors/resumeAll", this::handleResumeAllConnector);
-        app.get("/connectors/enableAll", this::handleEnableAllConnector);
-        app.get("/connectors/disableAll", this::handleDisableAllConnector);
-        app.get("/connectors/:connectorName", this::handleCreateConnector);
-        app.get("/connectors/:connectorName/config", 
this::handleQueryConnectorConfig);
-        app.get("/connectors/:connectorName/status", 
this::handleQueryConnectorStatus);
-        app.get("/connectors/:connectorName/stop", this::handleStopConnector);
-        app.get("/connectors/:connectorName/pause", 
this::handlePauseConnector);
-        app.get("/connectors/:connectorName/resume", 
this::handleResumeConnector);
-        app.get("/connectors/:connectorName/enable", 
this::handleEnableConnector);
-        app.get("/connectors/:connectorName/disable", 
this::handleDisableConnector);
-        app.get("/getClusterInfo", this::getClusterInfo);
-        app.get("/getConfigInfo", this::getConfigInfo);
-        app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
-        app.get("/getAllocatedTasks", this::getAllocatedTasks);
-        app.get("/plugin/reload", this::reloadPlugins);
+        if (this.connectController.getConnectConfig().getIsLeader() == 1) {
+            app.get("/connectors/stopAll", this::handleStopAllConnector);
+            app.get("/connectors/pauseAll", this::handlePauseAllConnector);
+            app.get("/connectors/resumeAll", this::handleResumeAllConnector);
+            app.get("/connectors/enableAll", this::handleEnableAllConnector);
+            app.get("/connectors/disableAll", this::handleDisableAllConnector);
+            app.get("/connectors/:connectorName", this::handleCreateConnector);
+            app.get("/connectors/:connectorName/config", 
this::handleQueryConnectorConfig);
+            app.get("/connectors/:connectorName/status", 
this::handleQueryConnectorStatus);
+            app.get("/connectors/:connectorName/stop", 
this::handleStopConnector);
+            app.get("/connectors/:connectorName/pause", 
this::handlePauseConnector);
+            app.get("/connectors/:connectorName/resume", 
this::handleResumeConnector);
+            app.get("/connectors/:connectorName/enable", 
this::handleEnableConnector);
+            app.get("/connectors/:connectorName/disable", 
this::handleDisableConnector);
+            app.get("/getClusterInfo", this::getClusterInfo);
+            app.get("/getConfigInfo", this::getConfigInfo);
+            app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
+            app.get("/getAllocatedTasks", this::getAllocatedTasks);
+            app.get("/plugin/reload", this::reloadPlugins);
+        }
+        else {
+            app.get("/connectors/:connectorName/config", 
this::handleQueryConnectorConfig);
+            app.get("/connectors/:connectorName/status", 
this::handleQueryConnectorStatus);
+            app.get("/getClusterInfo", this::getClusterInfo);
+            app.get("/getConfigInfo", this::getConfigInfo);
+            app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
+            app.get("/getAllocatedTasks", this::getAllocatedTasks);
+            app.get("/plugin/reload", this::reloadPlugins);
+            app.get("/*", this::RedirectionToLeader);
+        }
+    }
+
+    /**
+     * Redirect to the Leader
+     *
+     * @param context
+     */
+    private void RedirectionToLeader(Context context) {
+        String address = 
this.connectController.getConnectConfig().getLeaderID().replace(":", "@");
+        String parm = context.queryString() == null ? "" : "?" + 
context.queryString();

Review comment:
       as per our discussion, redirect is deprecated

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
##########
@@ -141,4 +155,22 @@ public RemotingCommand workerChanged(ChannelHandlerContext 
ctx,
             return false;
         }
     }
+
+    /**
+     * Check whether the leader is down and the master-slave switch occurs
+     *
+     */
+    private void checkClusterLeader() {
+        if (connectConfig.getLeaderID() == null) return;
+        List<String> workers = getAllAliveWorkers();
+        if (connectConfig.getIsLeader() == 1 && 
!workers.contains(connectConfig.getLeaderID() + "")) {

Review comment:
       why do we need  + "" here

##########
File path: 
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
##########
@@ -107,10 +125,53 @@ public void stop() {
         taskKeyValueStore.persist();
     }
 
+
+    /**
+     * Check if Cluster has leader
+     *
+     * @return workerID if Cluster has leader or this worker is leader
+     */
+    public void checkLeaderState() throws ConnectException {
+        if (connectConfig.getIsLeader() == 1) {
+            log.info("This worker is a leader, leaderID is " + 
connectConfig.getWorkerID());
+            connectConfig.setLeaderID(connectConfig.getWorkerID());
+            sendOnlineConfig();
+        }
+        else {
+            if (connectConfig.getLeaderID() != null) {
+                log.info("This worker is a follower, leader is " + 
connectConfig.getLeaderID());
+                connectConfig.setLeaderID(connectConfig.getLeaderID());
+            }
+            else

Review comment:
       There are other places with similar issues, try to fix them as well~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to