This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new cc87850 1,master and worker register ip use OSUtils.getHost()
2,ProcessInstance host set ip:port format (#2209)
cc87850 is described below
commit cc878505cdac486be5c3efd818e4c0ca9e51c0b4
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 17 16:38:00 2020 +0800
1,master and worker register ip use OSUtils.getHost() 2,ProcessInstance
host set ip:port format (#2209)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
Co-authored-by: qiaozhanwei <[email protected]>
---
.../server/master/registry/MasterRegistry.java | 5 ++---
.../server/master/runner/MasterSchedulerService.java | 10 +++++++++-
.../server/worker/registry/WorkerRegistry.java | 5 ++---
3 files changed, 13 insertions(+), 7 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 0402520..b658298 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -21,7 +21,6 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -79,7 +78,7 @@ public class MasterRegistry {
* registry
*/
public void registry() {
- String address = Constants.LOCAL_ADDRESS;
+ String address = OSUtils.getHost();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
"");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
ConnectionStateListener() {
@@ -125,7 +124,7 @@ public class MasterRegistry {
* @return
*/
private String getLocalAddress(){
- return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort();
+ return OSUtils.getHost() + ":" + masterConfig.getListenPort();
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 4ba0d48..2a0e0a9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -76,6 +76,7 @@ public class MasterSchedulerService extends Thread {
*/
private ThreadPoolExecutor masterExecService;
+
/**
* constructor of MasterSchedulerThread
*/
@@ -122,7 +123,10 @@ public class MasterSchedulerService extends Thread {
logger.info(String.format("find one command: id: %d,
type: %s", command.getId(),command.getCommandType().toString()));
try{
- ProcessInstance processInstance =
processService.handleCommand(logger, OSUtils.getHost(),
this.masterConfig.getMasterExecThreads() - activeCount, command);
+
+ ProcessInstance processInstance =
processService.handleCommand(logger,
+ getLocalAddress(),
+ this.masterConfig.getMasterExecThreads() -
activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split
DAG ...");
masterExecService.execute(new
MasterExecThread(processInstance, processService, nettyRemotingClient));
@@ -143,4 +147,8 @@ public class MasterSchedulerService extends Thread {
}
}
}
+
+ private String getLocalAddress(){
+ return OSUtils.getHost() + ":" + masterConfig.getListenPort();
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index b42386a..4d72340 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -22,7 +22,6 @@ import
org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -86,7 +85,7 @@ public class WorkerRegistry {
* registry
*/
public void registry() {
- String address = Constants.LOCAL_ADDRESS;
+ String address = OSUtils.getHost();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
"");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
ConnectionStateListener() {
@@ -142,7 +141,7 @@ public class WorkerRegistry {
* @return
*/
private String getLocalAddress(){
- return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort();
+ return OSUtils.getHost() + ":" + workerConfig.getListenPort();
}
/**