This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4aac234  [Fix-4757][worker] fix worker failover host error (#4799)
4aac234 is described below

commit 4aac23481ede8de93e09241d6ae9dd0c16077e1b
Author: zhuangchong <[email protected]>
AuthorDate: Sun Feb 21 00:34:23 2021 +0800

    [Fix-4757][worker] fix worker failover host error (#4799)
    
    * resolve code conflicts
    
    * resolve code conflicts
    
    * resolve code conflicts
    
    * update WorkerGroupServiceImpl code style.
    
    * update worker group service test host data.
    
    * add ZookeeperNodeHandlerTest class
    
    * change WorkerZkNode to Host class.
    
    * add generate host string method.
---
 .../api/service/impl/WorkerGroupServiceImpl.java   | 46 ++++++++++++---------
 .../api/service/WorkerGroupServiceTest.java        |  4 +-
 .../apache/dolphinscheduler/remote/utils/Host.java | 48 ++++++++++++++++++----
 .../server/worker/registry/WorkerRegistry.java     | 17 ++------
 .../dolphinscheduler/server/zk/ZKMasterClient.java | 27 ++++++------
 .../dolphinscheduler/server/utils/HostTest.java    |  9 ++++
 6 files changed, 94 insertions(+), 57 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 81d923e..2f899ed 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.service.impl;
 
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
 
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.WorkerGroupService;
@@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 
 import java.util.ArrayList;
@@ -135,6 +137,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl 
implements WorkerGro
      * @return WorkerGroup list
      */
     private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
+
         String workerPath = 
zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
         List<WorkerGroup> workerGroups = new ArrayList<>();
         List<String> workerGroupList;
@@ -142,38 +145,41 @@ public class WorkerGroupServiceImpl extends 
BaseServiceImpl implements WorkerGro
             workerGroupList = 
zookeeperCachedOperator.getChildrenKeys(workerPath);
         } catch (Exception e) {
             if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) {
-                if (!isPaging) {
-                    //ignore noNodeException return Default
-                    WorkerGroup wg = new WorkerGroup();
-                    wg.setName(DEFAULT_WORKER_GROUP);
-                    workerGroups.add(wg);
+                if (isPaging) {
+                    return workerGroups;
                 }
+
+                //ignore noNodeException return Default
+                WorkerGroup wg = new WorkerGroup();
+                wg.setName(DEFAULT_WORKER_GROUP);
+                workerGroups.add(wg);
                 return workerGroups;
+
             } else {
                 throw e;
             }
         }
 
         for (String workerGroup : workerGroupList) {
-            String workerGroupPath = String.format("%s/%s", workerPath, 
workerGroup);
+            String workerGroupPath = workerPath + SLASH + workerGroup;
             List<String> childrenNodes = 
zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
-            String timeStamp = "";
+            if (CollectionUtils.isEmpty(childrenNodes)) {
+                continue;
+            }
+            String timeStamp = childrenNodes.get(0);
             for (int i = 0; i < childrenNodes.size(); i++) {
-                String ip = childrenNodes.get(i);
-                childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":")));
-                timeStamp = ip.substring(ip.lastIndexOf(":"));
+                childrenNodes.set(i, 
Host.of(childrenNodes.get(i)).getAddressAndWeight());
             }
-            if (CollectionUtils.isNotEmpty(childrenNodes)) {
-                WorkerGroup wg = new WorkerGroup();
-                wg.setName(workerGroup);
-                if (isPaging) {
-                    wg.setIpList(childrenNodes);
-                    String registeredIpValue = 
zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + 
timeStamp);
-                    
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
-                    
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
-                }
-                workerGroups.add(wg);
+
+            WorkerGroup wg = new WorkerGroup();
+            wg.setName(workerGroup);
+            if (isPaging) {
+                wg.setIpList(childrenNodes);
+                String registeredIpValue = 
zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp);
+                
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
+                
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
             }
+            workerGroups.add(wg);
         }
         return workerGroups;
     }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 93463dc..db9bb4f 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -71,8 +71,8 @@ public class WorkerGroupServiceTest {
         
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
 
         List<String> defaultIpList = new ArrayList<>();
-        defaultIpList.add("192.168.220.188:1234");
-        defaultIpList.add("192.168.220.189:1234");
+        defaultIpList.add("192.168.220.188:1234:100:1234567");
+        defaultIpList.add("192.168.220.189:1234:100:1234567");
 
         Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + 
"/default")).thenReturn(defaultIpList);
 
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index c18d02f..7e42984 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -17,8 +17,11 @@
 
 package org.apache.dolphinscheduler.remote.utils;
 
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+
 import java.io.Serializable;
 import java.util.Objects;
+import java.util.StringJoiner;
 
 /**
  * server address
@@ -61,13 +64,13 @@ public class Host implements Serializable {
     public Host(String ip, int port) {
         this.ip = ip;
         this.port = port;
-        this.address = ip + ":" + port;
+        this.address = ip + COLON + port;
     }
 
     public Host(String ip, int port, int weight, long startTime) {
         this.ip = ip;
         this.port = port;
-        this.address = ip + ":" + port;
+        this.address = ip + COLON + port;
         this.weight = getWarmUpWeight(weight, startTime);
         this.startTime = startTime;
     }
@@ -75,7 +78,7 @@ public class Host implements Serializable {
     public Host(String ip, int port, int weight, long startTime, String 
workGroup) {
         this.ip = ip;
         this.port = port;
-        this.address = ip + ":" + port;
+        this.address = ip + COLON + port;
         this.weight = getWarmUpWeight(weight, startTime);
         this.workGroup = workGroup;
         this.startTime = startTime;
@@ -95,7 +98,7 @@ public class Host implements Serializable {
 
     public void setIp(String ip) {
         this.ip = ip;
-        this.address = ip + ":" + port;
+        this.address = ip + COLON + port;
     }
 
     public int getWeight() {
@@ -120,7 +123,7 @@ public class Host implements Serializable {
 
     public void setPort(int port) {
         this.port = port;
-        this.address = ip + ":" + port;
+        this.address = ip + COLON + port;
     }
 
     public String getWorkGroup() {
@@ -141,7 +144,7 @@ public class Host implements Serializable {
         if (address == null) {
             throw new IllegalArgumentException("Host : address is null.");
         }
-        String[] parts = address.split(":");
+        String[] parts = address.split(COLON);
         if (parts.length < 2) {
             throw new IllegalArgumentException(String.format("Host : %s 
illegal.", address));
         }
@@ -156,13 +159,28 @@ public class Host implements Serializable {
     }
 
     /**
+     * generate host string
+     * @param address address
+     * @param weight weight
+     * @param startTime startTime
+     * @return address:weight:startTime
+     */
+    public static String generate(String address, int weight, long startTime) {
+        StringJoiner stringJoiner = new StringJoiner(COLON);
+        stringJoiner.add(address)
+                .add(String.valueOf(weight))
+                .add(String.valueOf(startTime));
+        return stringJoiner.toString();
+    }
+
+    /**
      * whether old version
      *
      * @param address address
      * @return old version is true , otherwise is false
      */
     public static Boolean isOldVersion(String address) {
-        String[] parts = address.split(":");
+        String[] parts = address.split(COLON);
         return parts.length != 2 && parts.length != 3;
     }
 
@@ -186,8 +204,11 @@ public class Host implements Serializable {
     @Override
     public String toString() {
         return "Host{"
-            + "address='" + address + '\''
-            + '}';
+                + "address='" + address + '\''
+                + ", weight=" + weight
+                + ", startTime=" + startTime
+                + ", workGroup='" + workGroup + '\''
+                + '}';
     }
 
     /**
@@ -201,4 +222,13 @@ public class Host implements Serializable {
         }
         return weight;
     }
+
+    /**
+     * get address and weight
+     *
+     * @return address:weight
+     */
+    public String getAddressAndWeight() {
+        return address + COLON + weight;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index e779d5d..3d4d73f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -17,13 +17,13 @@
 
 package org.apache.dolphinscheduler.server.worker.registry;
 
-import static org.apache.dolphinscheduler.common.Constants.COLON;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static org.apache.dolphinscheduler.common.Constants.SLASH;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -146,8 +146,8 @@ public class WorkerRegistry {
 
         String address = getLocalAddress();
         String workerZkPathPrefix = 
this.zookeeperRegistryCenter.getWorkerPath();
-        String weight = getWorkerWeight();
-        String workerStartTime = COLON + System.currentTimeMillis();
+        int weight = workerConfig.getWeight();
+        long workerStartTime = System.currentTimeMillis();
 
         for (String workGroup : this.workerGroups) {
             StringBuilder workerZkPathBuilder = new StringBuilder(100);
@@ -157,9 +157,7 @@ public class WorkerRegistry {
             }
             // trim and lower case is need
             
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
-            workerZkPathBuilder.append(address);
-            workerZkPathBuilder.append(weight);
-            workerZkPathBuilder.append(workerStartTime);
+            workerZkPathBuilder.append(Host.generate(address, weight, 
workerStartTime));
             workerZkPaths.add(workerZkPathBuilder.toString());
         }
         return workerZkPaths;
@@ -172,11 +170,4 @@ public class WorkerRegistry {
         return NetUtils.getAddr(workerConfig.getListenPort());
     }
 
-    /**
-     * get Worker Weight
-     */
-    private String getWorkerWeight() {
-        return COLON + workerConfig.getWeight();
-    }
-
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index f6d4d0d..37484da 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -14,12 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.zk;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -27,24 +25,27 @@ import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 
 import java.util.Date;
 import java.util.List;
 
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
  * zookeeper master client
@@ -134,9 +135,9 @@ public class ZKMasterClient extends AbstractZKClient {
             mutex.acquire();
 
             String serverHost = null;
-            if(StringUtils.isNotEmpty(path)){
+            if (StringUtils.isNotEmpty(path)) {
                 serverHost = getHostByEventDataPath(path);
-                if(StringUtils.isEmpty(serverHost)){
+                if (StringUtils.isEmpty(serverHost)) {
                     logger.error("server down error: unknown path: {}", path);
                     return;
                 }
@@ -305,8 +306,8 @@ public class ZKMasterClient extends AbstractZKClient {
      * @throws Exception exception
      */
     private void failoverWorker(String workerHost, boolean 
needCheckWorkerAlive) throws Exception {
+        workerHost = Host.of(workerHost).getAddress();
         logger.info("start worker[{}] failover ...", workerHost);
-
         List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
         for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
             if (needCheckWorkerAlive) {
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
index 6273569..80ff11e 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
@@ -40,4 +40,13 @@ public class HostTest {
         Host host = Host.of("192.158.2.2:22");
         Assert.assertEquals(22, host.getPort());
     }
+
+    @Test
+    public void testGenerate() {
+        String address = "192.158.2.2:22";
+        int weight = 100;
+        long startTime = System.currentTimeMillis();
+        String generateHost = Host.generate(address, weight, startTime);
+        Assert.assertEquals(address + ":" + weight + ":" + startTime, 
generateHost);
+    }
 }

Reply via email to