This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e935d5  refactor AbstractZKClient (#1627)
5e935d5 is described below

commit 5e935d579eec25701a6ce1d29fea95b161173ba3
Author: Tboy <[email protected]>
AuthorDate: Mon Dec 30 17:59:06 2019 +0800

    refactor AbstractZKClient (#1627)
    
    * we should insert alert DB once , and trigger this type of alert 3 times
    
    * refactor AbstractZKClient
---
 .../common/zk/AbstractZKClient.java                | 96 ++++++++--------------
 1 file changed, 36 insertions(+), 60 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
index 0e95ddd..c3ba718 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
@@ -16,6 +16,22 @@
  */
 package org.apache.dolphinscheduler.common.zk;
 
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ResInfo;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 
 /**
  * abstract zookeeper client
@@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
                                return;
                        }
 
-                       byte[] bytes = zkClient.getData().forPath(znode);
-                       String resInfoStr = new String(bytes);
+                       String resInfoStr = super.get(znode);
                        String[] splits = resInfoStr.split(Constants.COMMA);
                        if (splits.length != 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
                                return;
@@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
                String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX 
: WORKER_PREFIX;
                String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH 
+ type + UNDERLINE + ipSeqNo;
 
-               if(zkClient.checkExists().forPath(zNode) == null ||
-                               zkClient.checkExists().forPath(deadServerPath) 
!= null ){
+               if(!isExisted(zNode) || isExisted(deadServerPath)){
                        return true;
                }
 
@@ -118,14 +115,12 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
 
 
        public void removeDeadServerByHost(String host, String serverType) 
throws Exception {
-        List<String> deadServers = 
zkClient.getChildren().forPath(getDeadZNodeParentPath());
+        List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
         for(String serverPath : deadServers){
             if(serverPath.startsWith(serverType+UNDERLINE+host)){
-                               String server = getDeadZNodeParentPath() + 
SINGLE_SLASH + serverPath;
-                               if(zkClient.checkExists().forPath(server) != 
null){
-                                       zkClient.delete().forPath(server);
-                                       logger.info("{} server {} deleted from 
zk dead server path success" , serverType , host);
-                               }
+                                                       String server = 
getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+              super.remove(server);
+                                                       logger.info("{} server 
{} deleted from zk dead server path success" , serverType , host);
             }
         }
        }
@@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
                // create temporary sequence nodes for master znode
                String parentPath = getZNodeParentPath(zkNodeType);
                String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
-               String registerPath = 
zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
-                               serverPathPrefix + UNDERLINE, 
heartbeatZKInfo.getBytes());
+    String registerPath = serverPathPrefix + UNDERLINE;
+    super.persistEphemeral(registerPath, heartbeatZKInfo);
                logger.info("register {} node {} success" , 
zkNodeType.toString(), registerPath);
                return registerPath;
        }
@@ -165,7 +160,7 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
                }
                registerPath = createZNodePath(zkNodeType);
 
-        // handle dead server
+    // handle dead server
                handleDeadServer(registerPath, zkNodeType, 
Constants.DELETE_ZK_OP);
 
                return registerPath;
@@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
 
                }else if(opType.equals(ADD_ZK_OP)){
                        String deadServerPath = getDeadZNodeParentPath() + 
SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
-                       if(zkClient.checkExists().forPath(deadServerPath) == 
null){
+                       if(!super.isExisted(deadServerPath)){
                                //add dead server info to zk dead server path : 
/dead-servers/
 
-                               zkClient.create().forPath(deadServerPath,(type 
+ UNDERLINE + ipSeqNo).getBytes());
+                               super.persist(deadServerPath,(type + UNDERLINE 
+ ipSeqNo));
 
                                logger.info("{} server dead , and {} added to 
zk dead server path success" ,
                                                zkNodeType.toString(), zNode);
@@ -226,19 +221,13 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
                List<String> childrenList = new ArrayList<>();
                try {
                        // read master node parent path from conf
-                       
if(zkClient.checkExists().forPath(getZNodeParentPath(ZKNodeType.MASTER)) != 
null){
-                               childrenList = 
zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER));
+                       
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
+                               childrenList = 
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
                        }
                } catch (Exception e) {
-                       
if(e.getMessage().contains("java.lang.IllegalStateException: instance must be 
started")){
-                               logger.error("zookeeper service not started",e);
-                       }else{
-                               logger.error(e.getMessage(),e);
-                       }
-
-               }finally {
-                       return childrenList.size();
+                       logger.error("getActiveMasterNum error",e);
                }
+               return childrenList.size();
        }
 
        /**
@@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
                Map<String, String> masterMap = new HashMap<>();
                try {
                        String path =  getZNodeParentPath(zkNodeType);
-                       List<String> serverList  = 
getZkClient().getChildren().forPath(path);
+                       List<String> serverList  = super.getChildrenKeys(path);
                        for(String server : serverList){
-                               byte[] bytes  = 
getZkClient().getData().forPath(path + "/" + server);
-                               masterMap.putIfAbsent(server, new 
String(bytes));
+                               masterMap.putIfAbsent(server, super.get(path + 
"/" + server));
                        }
                } catch (Exception e) {
                        logger.error("get server list failed : " + 
e.getMessage(), e);
@@ -430,9 +418,9 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
         */
        protected void initSystemZNode(){
                try {
-                       createNodePath(getMasterZNodeParentPath());
-                       createNodePath(getWorkerZNodeParentPath());
-                       createNodePath(getDeadZNodeParentPath());
+                       persist(getMasterZNodeParentPath(), "");
+                       persist(getWorkerZNodeParentPath(), "");
+                       persist(getDeadZNodeParentPath(), "");
 
                } catch (Exception e) {
                        logger.error("init system znode failed : " + 
e.getMessage(),e);
@@ -440,18 +428,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator{
        }
 
        /**
-        * create zookeeper node path if not exists
-        * @param zNodeParentPath zookeeper parent path
-        * @throws Exception errors
-        */
-       private void createNodePath(String zNodeParentPath) throws Exception {
-           if(null == zkClient.checkExists().forPath(zNodeParentPath)){
-               zkClient.create().creatingParentContainersIfNeeded()
-                                       
.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
-               }
-       }
-
-       /**
         * server self dead, stop all threads
         * @param serverHost server host
         * @param zkNodeType zookeeper node type

Reply via email to