This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 742f0c3e60 Add workflowInstanceHost and taskInstanceHost in
ITaskInstanceExecutionEvent (#14827)
742f0c3e60 is described below
commit 742f0c3e60187efb515977aaf2df0a38119840a3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Aug 29 13:32:17 2023 +0800
Add workflowInstanceHost and taskInstanceHost in
ITaskInstanceExecutionEvent (#14827)
---
.../extract/master/transportor/ITaskInstanceExecutionEvent.java | 8 +++++++-
.../master/transportor/TaskInstanceExecutionFinishEvent.java | 4 +++-
.../master/transportor/TaskInstanceExecutionInfoEvent.java | 4 +++-
.../master/transportor/TaskInstanceExecutionRunningEvent.java | 4 +++-
.../dolphinscheduler/server/master/processor/queue/TaskEvent.java | 6 +++---
.../message/LogicTaskInstanceExecuteRunningEventSender.java | 5 +++--
.../message/LogicTaskInstanceExecutionFinishEventSender.java | 5 +++--
.../server/worker/message/MessageRetryRunner.java | 2 +-
.../worker/message/TaskInstanceExecutionFinishEventSender.java | 5 +++--
.../message/TaskInstanceExecutionInfoUpdateEventSender.java | 5 +++--
.../worker/message/TaskInstanceExecutionRunningEventSender.java | 5 +++--
11 files changed, 35 insertions(+), 18 deletions(-)
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java
index 3fa5ace8ce..25b7b7e727 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java
@@ -27,7 +27,13 @@ public interface ITaskInstanceExecutionEvent {
void setEventSendTime(long eventSendTime);
- void setHost(String host);
+ void setWorkflowInstanceHost(String host);
+
+ String getWorkflowInstanceHost();
+
+ void setTaskInstanceHost(String host);
+
+ String getTaskInstanceHost();
TaskInstanceExecutionEventType getEventType();
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java
index 33ac1280d0..469412c66e 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java
@@ -34,7 +34,9 @@ public class TaskInstanceExecutionFinishEvent implements
ITaskInstanceExecutionE
private long startTime;
- private String host;
+ private String taskInstanceHost;
+
+ private String workflowInstanceHost;
private String logPath;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java
index 22039a9e70..1623e03e92 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java
@@ -32,7 +32,9 @@ public class TaskInstanceExecutionInfoEvent implements
ITaskInstanceExecutionEve
private long startTime;
- private String host;
+ private String workflowInstanceHost;
+
+ private String taskInstanceHost;
private String logPath;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java
index 023d953ad8..4c48a0a381 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java
@@ -34,7 +34,9 @@ public class TaskInstanceExecutionRunningEvent implements
ITaskInstanceExecution
private long startTime;
- private String host;
+ private String taskInstanceHost;
+
+ private String workflowInstanceHost;
private TaskExecutionStatus status;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index e55dfbdd5d..be21148ef6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -114,7 +114,7 @@ public class TaskEvent {
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setAppIds(command.getAppIds());
- event.setWorkerAddress(command.getHost());
+ event.setWorkerAddress(command.getTaskInstanceHost());
event.setEvent(TaskEventType.RUNNING);
return event;
}
@@ -131,7 +131,7 @@ public class TaskEvent {
event.setProcessId(command.getProcessId());
event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool());
- event.setWorkerAddress(command.getHost());
+ event.setWorkerAddress(command.getTaskInstanceHost());
event.setEvent(TaskEventType.RESULT);
return event;
}
@@ -151,7 +151,7 @@ public class TaskEvent {
event.setTaskInstanceId(command.getTaskInstanceId());
event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
event.setLogPath(command.getLogPath());
- event.setWorkerAddress(command.getHost());
+ event.setWorkerAddress(command.getTaskInstanceHost());
event.setEvent(TaskEventType.UPDATE_PID);
return event;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
index 6ea085b31b..9b22486efe 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
@@ -35,7 +35,7 @@ public class LogicTaskInstanceExecuteRunningEventSender
public void sendMessage(TaskInstanceExecutionRunningEvent
taskInstanceExecutionRunningEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
-
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(),
+
.getProxyClient(taskInstanceExecutionRunningEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
}
@@ -47,7 +47,8 @@ public class LogicTaskInstanceExecuteRunningEventSender
taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
- taskExecuteRunningMessage.setHost(taskExecutionContext.getHost());
+
taskExecuteRunningMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
+
taskExecuteRunningMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
index 1949145e86..871a087358 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
@@ -33,7 +33,7 @@ public class LogicTaskInstanceExecutionFinishEventSender
public void sendMessage(TaskInstanceExecutionFinishEvent message) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(message.getHost(),
ITaskInstanceExecutionEventListener.class);
+ .getProxyClient(message.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message);
}
@@ -47,7 +47,8 @@ public class LogicTaskInstanceExecutionFinishEventSender
taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
- taskExecuteResultMessage.setHost(taskExecutionContext.getHost());
+
taskExecuteResultMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
+
taskExecuteResultMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index b5a388312b..2b9ce2ec5f 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -96,7 +96,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
List<TaskInstanceMessage> taskInstanceMessages =
this.needToRetryMessages.get(taskInstanceId);
if (taskInstanceMessages != null) {
taskInstanceMessages.forEach(taskInstanceMessage -> {
- taskInstanceMessage.getEvent().setHost(messageReceiverHost);
+
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
});
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
index 9469ba1a69..ae371ec4bf 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
@@ -34,7 +34,7 @@ public class TaskInstanceExecutionFinishEventSender
public void sendEvent(TaskInstanceExecutionFinishEvent
taskInstanceExecutionFinishEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
-
.getProxyClient(taskInstanceExecutionFinishEvent.getHost(),
+
.getProxyClient(taskInstanceExecutionFinishEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent);
}
@@ -49,7 +49,8 @@ public class TaskInstanceExecutionFinishEventSender
taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
- taskExecuteResultMessage.setHost(taskExecutionContext.getHost());
+
taskExecuteResultMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
+
taskExecuteResultMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
index 4b9e7e76b0..62649ae445 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
@@ -36,7 +36,7 @@ public class TaskInstanceExecutionInfoUpdateEventSender
public void sendEvent(TaskInstanceExecutionInfoEvent
taskInstanceExecutionInfoEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
-
.getProxyClient(taskInstanceExecutionInfoEvent.getHost(),
+
.getProxyClient(taskInstanceExecutionInfoEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent);
}
@@ -46,7 +46,8 @@ public class TaskInstanceExecutionInfoUpdateEventSender
TaskInstanceExecutionInfoEvent taskUpdatePidRequest = new
TaskInstanceExecutionInfoEvent();
taskUpdatePidRequest.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskUpdatePidRequest.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
- taskUpdatePidRequest.setHost(taskExecutionContext.getHost());
+
taskUpdatePidRequest.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
+
taskUpdatePidRequest.setTaskInstanceHost(taskExecutionContext.getHost());
taskUpdatePidRequest.setStartTime(taskExecutionContext.getStartTime());
return taskUpdatePidRequest;
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
index 4f64a94002..8ae8ca85cb 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
@@ -36,7 +36,7 @@ public class TaskInstanceExecutionRunningEventSender
public void sendEvent(TaskInstanceExecutionRunningEvent
taskInstanceExecutionRunningEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
-
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(),
+
.getProxyClient(taskInstanceExecutionRunningEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
}
@@ -48,7 +48,8 @@ public class TaskInstanceExecutionRunningEventSender
taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
- taskExecuteRunningMessage.setHost(taskExecutionContext.getHost());
+
taskExecuteRunningMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
+
taskExecuteRunningMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds());