This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 1.3.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.7-prepare by this push:
     new 7d500c4  Fix when the master or worker down, the alert cann't insert 
to Db, due to the datasource close (#5919)
7d500c4 is described below

commit 7d500c45d41b4a39e15c6372aa56863004979753
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 30 09:50:47 2021 +0800

    Fix when the master or worker down, the alert cann't insert to Db, due to 
the datasource close (#5919)
---
 .../server/master/registry/ServerNodeManager.java  | 12 ++++-
 .../server/master/zk/ZKMasterClient.java           | 40 +++++++++-------
 .../service/zk/AbstractListener.java               | 16 ++++++-
 .../service/zk/ZookeeperCachedOperator.java        | 55 ++++++++--------------
 .../service/zk/ZookeeperOperator.java              | 36 +-------------
 .../service/zk/ZookeeperCachedOperatorTest.java}   | 36 +++++++++-----
 pom.xml                                            |  1 +
 7 files changed, 96 insertions(+), 100 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index d713c83..7e0268f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -139,11 +139,11 @@ public class ServerNodeManager implements 
InitializingBean {
         /**
          * init MasterNodeListener listener
          */
-        registryCenter.getRegisterOperator().addListener(new 
MasterNodeListener());
+        registryCenter.getRegisterOperator().registerListener(new 
MasterNodeListener(Integer.MAX_VALUE));
         /**
          * init WorkerNodeListener listener
          */
-        registryCenter.getRegisterOperator().addListener(new 
WorkerGroupNodeListener());
+        registryCenter.getRegisterOperator().registerListener(new 
WorkerGroupNodeListener(Integer.MAX_VALUE));
     }
 
     /**
@@ -207,6 +207,10 @@ public class ServerNodeManager implements InitializingBean 
{
      */
     class WorkerGroupNodeListener extends AbstractListener {
 
+        public WorkerGroupNodeListener(int order) {
+            super(order);
+        }
+
         @Override
         protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
             if (registryCenter.isWorkerPath(path)) {
@@ -246,6 +250,10 @@ public class ServerNodeManager implements InitializingBean 
{
      */
     class MasterNodeListener extends AbstractListener {
 
+        public MasterNodeListener(int order) {
+            super(order);
+        }
+
         @Override
         protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
             if (registryCenter.isMasterPath(path)) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
index 8daf8a1..1f98ca0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.zk.AbstractListener;
 import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
 
 import org.apache.commons.lang.StringUtils;
@@ -97,7 +98,7 @@ public class ZKMasterClient extends AbstractZKClient {
                 removeZKNodePath(null, ZKNodeType.MASTER, true);
                 removeZKNodePath(null, ZKNodeType.WORKER, true);
             }
-            registerListener();
+            registerListener(new NodeChangeListener(Integer.MIN_VALUE));
         } catch (Exception e) {
             logger.error("master start up exception", e);
         } finally {
@@ -115,21 +116,28 @@ public class ZKMasterClient extends AbstractZKClient {
         super.close();
     }
 
-    /**
-     * handle path events that this class cares about
-     *
-     * @param client zkClient
-     * @param event path event
-     * @param path zk path
-     */
-    @Override
-    protected void dataChanged(CuratorFramework client, TreeCacheEvent event, 
String path) {
-        //monitor master
-        if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + 
Constants.SINGLE_SLASH)) {
-            handleMasterEvent(event, path);
-        } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + 
Constants.SINGLE_SLASH)) {
-            //monitor worker
-            handleWorkerEvent(event, path);
+    class NodeChangeListener extends AbstractListener {
+
+        public NodeChangeListener(int order) {
+            super(order);
+        }
+
+        /**
+         * handle path events that this class cares about
+         *
+         * @param client zkClient
+         * @param event path event
+         * @param path zk path
+         */
+        @Override
+        protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
+            //monitor master
+            if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + 
Constants.SINGLE_SLASH)) {
+                handleMasterEvent(event, path);
+            } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + 
Constants.SINGLE_SLASH)) {
+                //monitor worker
+                handleWorkerEvent(event, path);
+            }
         }
     }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
index 3e3e6c8..f27d322 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
@@ -21,7 +21,16 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 
-public abstract class AbstractListener implements TreeCacheListener {
+public abstract class AbstractListener implements TreeCacheListener, 
Comparable<AbstractListener> {
+
+    /**
+     * The order is represent as prioritization, the high order will be 
executed first
+     */
+    private final int order;
+
+    public AbstractListener(int order) {
+        this.order = order;
+    }
 
     @Override
     public final void childEvent(final CuratorFramework client, final 
TreeCacheEvent event) throws Exception {
@@ -33,4 +42,9 @@ public abstract class AbstractListener implements 
TreeCacheListener {
     }
 
     protected abstract void dataChanged(final CuratorFramework client, final 
TreeCacheEvent event, final String path);
+
+    @Override
+    public int compareTo(AbstractListener o) {
+        return order - o.order;
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 647a3c9..323dcb2 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -16,37 +16,35 @@
  */
 package org.apache.dolphinscheduler.service.zk;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import java.nio.charset.StandardCharsets;
-
 @Component
 public class ZookeeperCachedOperator extends ZookeeperOperator {
 
     private final Logger logger = 
LoggerFactory.getLogger(ZookeeperCachedOperator.class);
 
-
     private TreeCache treeCache;
+
+    /**
+     * The main point to define a listener list here is to execute the 
listener at customize order.
+     */
+    private List<AbstractListener> listenerList = new CopyOnWriteArrayList<>();
+
     /**
      * register a unified listener of /${dsRoot},
      */
     @Override
-    protected void registerListener() {
-
-        treeCache.getListenable().addListener((client, event) -> {
-            String path = null == event.getData() ? "" : 
event.getData().getPath();
-            if (path.isEmpty()) {
-                return;
-            }
-            dataChanged(client, event, path);
-        });
+    public void registerListener(AbstractListener abstractListener) {
+        logger.info("register zookeeper listener: {}", 
abstractListener.getClass().getName());
+        listenerList.add(abstractListener);
+        listenerList.sort(AbstractListener::compareTo);
     }
 
     @Override
@@ -59,25 +57,12 @@ public class ZookeeperCachedOperator extends 
ZookeeperOperator {
             logger.error("add listener to zk path: {} failed", 
getZookeeperConfig().getDsRoot());
             throw new RuntimeException(e);
         }
-    }
-
-    //for sub class
-    protected void dataChanged(final CuratorFramework client, final 
TreeCacheEvent event, final String path){}
-
-    public String getFromCache(final String cachePath, final String key) {
-        ChildData resultInCache = treeCache.getCurrentData(key);
-        if (null != resultInCache) {
-            return null == resultInCache.getData() ? null : new 
String(resultInCache.getData(), StandardCharsets.UTF_8);
-        }
-        return null;
-    }
-
-    public TreeCache getTreeCache(final String cachePath) {
-        return treeCache;
-    }
-
-    public void addListener(TreeCacheListener listener){
-        this.treeCache.getListenable().addListener(listener);
+        treeCache.getListenable().addListener(((client, event) -> {
+            for (AbstractListener abstractListener : listenerList) {
+                logger.debug("zookeeperListener:{} triggered", 
abstractListener.getClass().getName());
+                abstractListener.childEvent(client, event);
+            }
+        }));
     }
 
     @Override
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index dbfb8e2..5591bb4 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -30,7 +30,6 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -65,7 +64,8 @@ public class ZookeeperOperator implements InitializingBean {
     /**
      * this method is for sub class,
      */
-    protected void registerListener(){}
+    protected void registerListener(AbstractListener abstractListener) {
+    }
 
     protected void treeCacheStart(){}
 
@@ -143,16 +143,6 @@ public class ZookeeperOperator implements InitializingBean 
{
         }
     }
 
-    public boolean hasChildren(final String key) {
-        Stat stat;
-        try {
-            stat = zkClient.checkExists().forPath(key);
-            return stat.getNumChildren() >= 1;
-        } catch (Exception ex) {
-            throw new IllegalStateException(ex);
-        }
-    }
-
     public boolean isExisted(final String key) {
         try {
             return zkClient.checkExists().forPath(key) != null;
@@ -194,28 +184,6 @@ public class ZookeeperOperator implements InitializingBean 
{
         }
     }
 
-    public void persistEphemeral(String key, String value, boolean overwrite) {
-        try {
-            if (overwrite) {
-                persistEphemeral(key, value);
-            } else {
-                if (!isExisted(key)) {
-                    
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key,
 value.getBytes(StandardCharsets.UTF_8));
-                }
-            }
-        } catch (final Exception ex) {
-            logger.error("persistEphemeral key : {} , value : {}, overwrite : 
{}", key, value, overwrite, ex);
-        }
-    }
-
-    public void persistEphemeralSequential(final String key, String value) {
-        try {
-            
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key,
 value.getBytes(StandardCharsets.UTF_8));
-        } catch (final Exception ex) {
-            logger.error("persistEphemeralSequential key : {}", key, ex);
-        }
-    }
-
     public void remove(final String key) {
         try {
             if (isExisted(key)) {
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
similarity index 52%
copy from 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
copy to 
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
index 3e3e6c8..6eb9887 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
@@ -19,18 +19,30 @@ package org.apache.dolphinscheduler.service.zk;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 
-public abstract class AbstractListener implements TreeCacheListener {
+import org.junit.Assert;
+import org.junit.Test;
 
-    @Override
-    public final void childEvent(final CuratorFramework client, final 
TreeCacheEvent event) throws Exception {
-        String path = null == event.getData() ? "" : event.getData().getPath();
-        if (path.isEmpty()) {
-            return;
-        }
-        dataChanged(client, event, path);
-    }
+public class ZookeeperCachedOperatorTest {
+
+    private ZookeeperCachedOperator zookeeperCachedOperator = new 
ZookeeperCachedOperator();
 
-    protected abstract void dataChanged(final CuratorFramework client, final 
TreeCacheEvent event, final String path);
-}
+    @Test
+    public void testRegisterListener() {
+        AbstractListener abstractListener1 = new AbstractListener(1) {
+            @Override
+            protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
+                // ignore
+            }
+        };
+        AbstractListener abstractListener2 = new AbstractListener(2) {
+            @Override
+            protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
+                // ignore
+            }
+        };
+        zookeeperCachedOperator.registerListener(abstractListener2);
+        zookeeperCachedOperator.registerListener(abstractListener1);
+        Assert.assertTrue(true);
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 65830b6..6c82db9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -847,6 +847,7 @@
                         
<include>**/service/quartz/cron/CronUtilsTest.java</include>
                         
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
                         <include>**/service/zk/ZKServerTest.java</include>
+                        
<include>**/service/zk/ZookeeperCachedOperatorTest.java</include>
                         
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
                         
<include>**/service/zk/RegisterOperatorTest.java</include>
                         
<include>**/service/queue/TaskUpdateQueueTest.java</include>

Reply via email to