This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4aac234 [Fix-4757][worker] fix worker failover host error (#4799)
4aac234 is described below
commit 4aac23481ede8de93e09241d6ae9dd0c16077e1b
Author: zhuangchong <[email protected]>
AuthorDate: Sun Feb 21 00:34:23 2021 +0800
[Fix-4757][worker] fix worker failover host error (#4799)
* resolve code conflicts
* resolve code conflicts
* resolve code conflicts
* update WorkerGroupServiceImpl code style.
* update worker group service test host data.
* add ZookeeperNodeHandlerTest class
* change WorkerZkNode to Host class.
* add generate host string method.
---
.../api/service/impl/WorkerGroupServiceImpl.java | 46 ++++++++++++---------
.../api/service/WorkerGroupServiceTest.java | 4 +-
.../apache/dolphinscheduler/remote/utils/Host.java | 48 ++++++++++++++++++----
.../server/worker/registry/WorkerRegistry.java | 17 ++------
.../dolphinscheduler/server/zk/ZKMasterClient.java | 27 ++++++------
.../dolphinscheduler/server/utils/HostTest.java | 9 ++++
6 files changed, 94 insertions(+), 57 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 81d923e..2f899ed 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
@@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import java.util.ArrayList;
@@ -135,6 +137,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
* @return WorkerGroup list
*/
private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
+
String workerPath =
zookeeperCachedOperator.getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
List<WorkerGroup> workerGroups = new ArrayList<>();
List<String> workerGroupList;
@@ -142,38 +145,41 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
workerGroupList =
zookeeperCachedOperator.getChildrenKeys(workerPath);
} catch (Exception e) {
if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) {
- if (!isPaging) {
- //ignore noNodeException return Default
- WorkerGroup wg = new WorkerGroup();
- wg.setName(DEFAULT_WORKER_GROUP);
- workerGroups.add(wg);
+ if (isPaging) {
+ return workerGroups;
}
+
+ //ignore noNodeException return Default
+ WorkerGroup wg = new WorkerGroup();
+ wg.setName(DEFAULT_WORKER_GROUP);
+ workerGroups.add(wg);
return workerGroups;
+
} else {
throw e;
}
}
for (String workerGroup : workerGroupList) {
- String workerGroupPath = String.format("%s/%s", workerPath,
workerGroup);
+ String workerGroupPath = workerPath + SLASH + workerGroup;
List<String> childrenNodes =
zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
- String timeStamp = "";
+ if (CollectionUtils.isEmpty(childrenNodes)) {
+ continue;
+ }
+ String timeStamp = childrenNodes.get(0);
for (int i = 0; i < childrenNodes.size(); i++) {
- String ip = childrenNodes.get(i);
- childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":")));
- timeStamp = ip.substring(ip.lastIndexOf(":"));
+ childrenNodes.set(i,
Host.of(childrenNodes.get(i)).getAddressAndWeight());
}
- if (CollectionUtils.isNotEmpty(childrenNodes)) {
- WorkerGroup wg = new WorkerGroup();
- wg.setName(workerGroup);
- if (isPaging) {
- wg.setIpList(childrenNodes);
- String registeredIpValue =
zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) +
timeStamp);
-
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
-
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
- }
- workerGroups.add(wg);
+
+ WorkerGroup wg = new WorkerGroup();
+ wg.setName(workerGroup);
+ if (isPaging) {
+ wg.setIpList(childrenNodes);
+ String registeredIpValue =
zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp);
+
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
+
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
}
+ workerGroups.add(wg);
}
return workerGroups;
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 93463dc..db9bb4f 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -71,8 +71,8 @@ public class WorkerGroupServiceTest {
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultIpList = new ArrayList<>();
- defaultIpList.add("192.168.220.188:1234");
- defaultIpList.add("192.168.220.189:1234");
+ defaultIpList.add("192.168.220.188:1234:100:1234567");
+ defaultIpList.add("192.168.220.189:1234:100:1234567");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath +
"/default")).thenReturn(defaultIpList);
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index c18d02f..7e42984 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.remote.utils;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+
import java.io.Serializable;
import java.util.Objects;
+import java.util.StringJoiner;
/**
* server address
@@ -61,13 +64,13 @@ public class Host implements Serializable {
public Host(String ip, int port) {
this.ip = ip;
this.port = port;
- this.address = ip + ":" + port;
+ this.address = ip + COLON + port;
}
public Host(String ip, int port, int weight, long startTime) {
this.ip = ip;
this.port = port;
- this.address = ip + ":" + port;
+ this.address = ip + COLON + port;
this.weight = getWarmUpWeight(weight, startTime);
this.startTime = startTime;
}
@@ -75,7 +78,7 @@ public class Host implements Serializable {
public Host(String ip, int port, int weight, long startTime, String
workGroup) {
this.ip = ip;
this.port = port;
- this.address = ip + ":" + port;
+ this.address = ip + COLON + port;
this.weight = getWarmUpWeight(weight, startTime);
this.workGroup = workGroup;
this.startTime = startTime;
@@ -95,7 +98,7 @@ public class Host implements Serializable {
public void setIp(String ip) {
this.ip = ip;
- this.address = ip + ":" + port;
+ this.address = ip + COLON + port;
}
public int getWeight() {
@@ -120,7 +123,7 @@ public class Host implements Serializable {
public void setPort(int port) {
this.port = port;
- this.address = ip + ":" + port;
+ this.address = ip + COLON + port;
}
public String getWorkGroup() {
@@ -141,7 +144,7 @@ public class Host implements Serializable {
if (address == null) {
throw new IllegalArgumentException("Host : address is null.");
}
- String[] parts = address.split(":");
+ String[] parts = address.split(COLON);
if (parts.length < 2) {
throw new IllegalArgumentException(String.format("Host : %s
illegal.", address));
}
@@ -156,13 +159,28 @@ public class Host implements Serializable {
}
/**
+ * generate host string
+ * @param address address
+ * @param weight weight
+ * @param startTime startTime
+ * @return address:weight:startTime
+ */
+ public static String generate(String address, int weight, long startTime) {
+ StringJoiner stringJoiner = new StringJoiner(COLON);
+ stringJoiner.add(address)
+ .add(String.valueOf(weight))
+ .add(String.valueOf(startTime));
+ return stringJoiner.toString();
+ }
+
+ /**
* whether old version
*
* @param address address
* @return old version is true , otherwise is false
*/
public static Boolean isOldVersion(String address) {
- String[] parts = address.split(":");
+ String[] parts = address.split(COLON);
return parts.length != 2 && parts.length != 3;
}
@@ -186,8 +204,11 @@ public class Host implements Serializable {
@Override
public String toString() {
return "Host{"
- + "address='" + address + '\''
- + '}';
+ + "address='" + address + '\''
+ + ", weight=" + weight
+ + ", startTime=" + startTime
+ + ", workGroup='" + workGroup + '\''
+ + '}';
}
/**
@@ -201,4 +222,13 @@ public class Host implements Serializable {
}
return weight;
}
+
+ /**
+ * get address and weight
+ *
+ * @return address:weight
+ */
+ public String getAddressAndWeight() {
+ return address + COLON + weight;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index e779d5d..3d4d73f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -17,13 +17,13 @@
package org.apache.dolphinscheduler.server.worker.registry;
-import static org.apache.dolphinscheduler.common.Constants.COLON;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -146,8 +146,8 @@ public class WorkerRegistry {
String address = getLocalAddress();
String workerZkPathPrefix =
this.zookeeperRegistryCenter.getWorkerPath();
- String weight = getWorkerWeight();
- String workerStartTime = COLON + System.currentTimeMillis();
+ int weight = workerConfig.getWeight();
+ long workerStartTime = System.currentTimeMillis();
for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100);
@@ -157,9 +157,7 @@ public class WorkerRegistry {
}
// trim and lower case is need
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
- workerZkPathBuilder.append(address);
- workerZkPathBuilder.append(weight);
- workerZkPathBuilder.append(workerStartTime);
+ workerZkPathBuilder.append(Host.generate(address, weight,
workerStartTime));
workerZkPaths.add(workerZkPathBuilder.toString());
}
return workerZkPaths;
@@ -172,11 +170,4 @@ public class WorkerRegistry {
return NetUtils.getAddr(workerConfig.getListenPort());
}
- /**
- * get Worker Weight
- */
- private String getWorkerWeight() {
- return COLON + workerConfig.getWeight();
- }
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index f6d4d0d..37484da 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.zk;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -27,24 +25,27 @@ import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.Date;
import java.util.List;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
/**
* zookeeper master client
@@ -134,9 +135,9 @@ public class ZKMasterClient extends AbstractZKClient {
mutex.acquire();
String serverHost = null;
- if(StringUtils.isNotEmpty(path)){
+ if (StringUtils.isNotEmpty(path)) {
serverHost = getHostByEventDataPath(path);
- if(StringUtils.isEmpty(serverHost)){
+ if (StringUtils.isEmpty(serverHost)) {
logger.error("server down error: unknown path: {}", path);
return;
}
@@ -305,8 +306,8 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception
*/
private void failoverWorker(String workerHost, boolean
needCheckWorkerAlive) throws Exception {
+ workerHost = Host.of(workerHost).getAddress();
logger.info("start worker[{}] failover ...", workerHost);
-
List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
if (needCheckWorkerAlive) {
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
index 6273569..80ff11e 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
@@ -40,4 +40,13 @@ public class HostTest {
Host host = Host.of("192.158.2.2:22");
Assert.assertEquals(22, host.getPort());
}
+
+ @Test
+ public void testGenerate() {
+ String address = "192.158.2.2:22";
+ int weight = 100;
+ long startTime = System.currentTimeMillis();
+ String generateHost = Host.generate(address, weight, startTime);
+ Assert.assertEquals(address + ":" + weight + ":" + startTime,
generateHost);
+ }
}