This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch 1.3.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.7-prepare by this push:
new 7d500c4 Fix when the master or worker down, the alert cann't insert
to Db, due to the datasource close (#5919)
7d500c4 is described below
commit 7d500c45d41b4a39e15c6372aa56863004979753
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 30 09:50:47 2021 +0800
Fix when the master or worker down, the alert cann't insert to Db, due to
the datasource close (#5919)
---
.../server/master/registry/ServerNodeManager.java | 12 ++++-
.../server/master/zk/ZKMasterClient.java | 40 +++++++++-------
.../service/zk/AbstractListener.java | 16 ++++++-
.../service/zk/ZookeeperCachedOperator.java | 55 ++++++++--------------
.../service/zk/ZookeeperOperator.java | 36 +-------------
.../service/zk/ZookeeperCachedOperatorTest.java} | 36 +++++++++-----
pom.xml | 1 +
7 files changed, 96 insertions(+), 100 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index d713c83..7e0268f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -139,11 +139,11 @@ public class ServerNodeManager implements
InitializingBean {
/**
* init MasterNodeListener listener
*/
- registryCenter.getRegisterOperator().addListener(new
MasterNodeListener());
+ registryCenter.getRegisterOperator().registerListener(new
MasterNodeListener(Integer.MAX_VALUE));
/**
* init WorkerNodeListener listener
*/
- registryCenter.getRegisterOperator().addListener(new
WorkerGroupNodeListener());
+ registryCenter.getRegisterOperator().registerListener(new
WorkerGroupNodeListener(Integer.MAX_VALUE));
}
/**
@@ -207,6 +207,10 @@ public class ServerNodeManager implements InitializingBean
{
*/
class WorkerGroupNodeListener extends AbstractListener {
+ public WorkerGroupNodeListener(int order) {
+ super(order);
+ }
+
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
if (registryCenter.isWorkerPath(path)) {
@@ -246,6 +250,10 @@ public class ServerNodeManager implements InitializingBean
{
*/
class MasterNodeListener extends AbstractListener {
+ public MasterNodeListener(int order) {
+ super(order);
+ }
+
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
if (registryCenter.isMasterPath(path)) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
index 8daf8a1..1f98ca0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.apache.commons.lang.StringUtils;
@@ -97,7 +98,7 @@ public class ZKMasterClient extends AbstractZKClient {
removeZKNodePath(null, ZKNodeType.MASTER, true);
removeZKNodePath(null, ZKNodeType.WORKER, true);
}
- registerListener();
+ registerListener(new NodeChangeListener(Integer.MIN_VALUE));
} catch (Exception e) {
logger.error("master start up exception", e);
} finally {
@@ -115,21 +116,28 @@ public class ZKMasterClient extends AbstractZKClient {
super.close();
}
- /**
- * handle path events that this class cares about
- *
- * @param client zkClient
- * @param event path event
- * @param path zk path
- */
- @Override
- protected void dataChanged(CuratorFramework client, TreeCacheEvent event,
String path) {
- //monitor master
- if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) +
Constants.SINGLE_SLASH)) {
- handleMasterEvent(event, path);
- } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) +
Constants.SINGLE_SLASH)) {
- //monitor worker
- handleWorkerEvent(event, path);
+ class NodeChangeListener extends AbstractListener {
+
+ public NodeChangeListener(int order) {
+ super(order);
+ }
+
+ /**
+ * handle path events that this class cares about
+ *
+ * @param client zkClient
+ * @param event path event
+ * @param path zk path
+ */
+ @Override
+ protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
+ //monitor master
+ if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) +
Constants.SINGLE_SLASH)) {
+ handleMasterEvent(event, path);
+ } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) +
Constants.SINGLE_SLASH)) {
+ //monitor worker
+ handleWorkerEvent(event, path);
+ }
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
index 3e3e6c8..f27d322 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
@@ -21,7 +21,16 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-public abstract class AbstractListener implements TreeCacheListener {
+public abstract class AbstractListener implements TreeCacheListener,
Comparable<AbstractListener> {
+
+ /**
+ * The order is represent as prioritization, the high order will be
executed first
+ */
+ private final int order;
+
+ public AbstractListener(int order) {
+ this.order = order;
+ }
@Override
public final void childEvent(final CuratorFramework client, final
TreeCacheEvent event) throws Exception {
@@ -33,4 +42,9 @@ public abstract class AbstractListener implements
TreeCacheListener {
}
protected abstract void dataChanged(final CuratorFramework client, final
TreeCacheEvent event, final String path);
+
+ @Override
+ public int compareTo(AbstractListener o) {
+ return order - o.order;
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 647a3c9..323dcb2 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -16,37 +16,35 @@
*/
package org.apache.dolphinscheduler.service.zk;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger =
LoggerFactory.getLogger(ZookeeperCachedOperator.class);
-
private TreeCache treeCache;
+
+ /**
+ * The main point to define a listener list here is to execute the
listener at customize order.
+ */
+ private List<AbstractListener> listenerList = new CopyOnWriteArrayList<>();
+
/**
* register a unified listener of /${dsRoot},
*/
@Override
- protected void registerListener() {
-
- treeCache.getListenable().addListener((client, event) -> {
- String path = null == event.getData() ? "" :
event.getData().getPath();
- if (path.isEmpty()) {
- return;
- }
- dataChanged(client, event, path);
- });
+ public void registerListener(AbstractListener abstractListener) {
+ logger.info("register zookeeper listener: {}",
abstractListener.getClass().getName());
+ listenerList.add(abstractListener);
+ listenerList.sort(AbstractListener::compareTo);
}
@Override
@@ -59,25 +57,12 @@ public class ZookeeperCachedOperator extends
ZookeeperOperator {
logger.error("add listener to zk path: {} failed",
getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
- }
-
- //for sub class
- protected void dataChanged(final CuratorFramework client, final
TreeCacheEvent event, final String path){}
-
- public String getFromCache(final String cachePath, final String key) {
- ChildData resultInCache = treeCache.getCurrentData(key);
- if (null != resultInCache) {
- return null == resultInCache.getData() ? null : new
String(resultInCache.getData(), StandardCharsets.UTF_8);
- }
- return null;
- }
-
- public TreeCache getTreeCache(final String cachePath) {
- return treeCache;
- }
-
- public void addListener(TreeCacheListener listener){
- this.treeCache.getListenable().addListener(listener);
+ treeCache.getListenable().addListener(((client, event) -> {
+ for (AbstractListener abstractListener : listenerList) {
+ logger.debug("zookeeperListener:{} triggered",
abstractListener.getClass().getName());
+ abstractListener.childEvent(client, event);
+ }
+ }));
}
@Override
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index dbfb8e2..5591bb4 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -30,7 +30,6 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -65,7 +64,8 @@ public class ZookeeperOperator implements InitializingBean {
/**
* this method is for sub class,
*/
- protected void registerListener(){}
+ protected void registerListener(AbstractListener abstractListener) {
+ }
protected void treeCacheStart(){}
@@ -143,16 +143,6 @@ public class ZookeeperOperator implements InitializingBean
{
}
}
- public boolean hasChildren(final String key) {
- Stat stat;
- try {
- stat = zkClient.checkExists().forPath(key);
- return stat.getNumChildren() >= 1;
- } catch (Exception ex) {
- throw new IllegalStateException(ex);
- }
- }
-
public boolean isExisted(final String key) {
try {
return zkClient.checkExists().forPath(key) != null;
@@ -194,28 +184,6 @@ public class ZookeeperOperator implements InitializingBean
{
}
}
- public void persistEphemeral(String key, String value, boolean overwrite) {
- try {
- if (overwrite) {
- persistEphemeral(key, value);
- } else {
- if (!isExisted(key)) {
-
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key,
value.getBytes(StandardCharsets.UTF_8));
- }
- }
- } catch (final Exception ex) {
- logger.error("persistEphemeral key : {} , value : {}, overwrite :
{}", key, value, overwrite, ex);
- }
- }
-
- public void persistEphemeralSequential(final String key, String value) {
- try {
-
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key,
value.getBytes(StandardCharsets.UTF_8));
- } catch (final Exception ex) {
- logger.error("persistEphemeralSequential key : {}", key, ex);
- }
- }
-
public void remove(final String key) {
try {
if (isExisted(key)) {
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
similarity index 52%
copy from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
copy to
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
index 3e3e6c8..6eb9887 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
@@ -19,18 +19,30 @@ package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-public abstract class AbstractListener implements TreeCacheListener {
+import org.junit.Assert;
+import org.junit.Test;
- @Override
- public final void childEvent(final CuratorFramework client, final
TreeCacheEvent event) throws Exception {
- String path = null == event.getData() ? "" : event.getData().getPath();
- if (path.isEmpty()) {
- return;
- }
- dataChanged(client, event, path);
- }
+public class ZookeeperCachedOperatorTest {
+
+ private ZookeeperCachedOperator zookeeperCachedOperator = new
ZookeeperCachedOperator();
- protected abstract void dataChanged(final CuratorFramework client, final
TreeCacheEvent event, final String path);
-}
+ @Test
+ public void testRegisterListener() {
+ AbstractListener abstractListener1 = new AbstractListener(1) {
+ @Override
+ protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
+ // ignore
+ }
+ };
+ AbstractListener abstractListener2 = new AbstractListener(2) {
+ @Override
+ protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
+ // ignore
+ }
+ };
+ zookeeperCachedOperator.registerListener(abstractListener2);
+ zookeeperCachedOperator.registerListener(abstractListener1);
+ Assert.assertTrue(true);
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 65830b6..6c82db9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -847,6 +847,7 @@
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include>
+
<include>**/service/zk/ZookeeperCachedOperatorTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>