This is an automated email from the ASF dual-hosted git repository.
changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c7a11ce6ed [Improvement][K8S]Optimize MDC for K8S tasks (#15390)
c7a11ce6ed is described below
commit c7a11ce6ed8b606671b4f1fa06b522a72aed50d4
Author: Gallardot <[email protected]>
AuthorDate: Wed Jan 17 17:56:11 2024 +0800
[Improvement][K8S]Optimize MDC for K8S tasks (#15390)
Signed-off-by: Gallardot <[email protected]>
Co-authored-by: fuchanghai <[email protected]>
---
.../plugin/task/api/k8s/impl/K8sTaskExecutor.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index 167cae8668..986c9dc8a7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -205,6 +205,8 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
@Override
public void eventReceived(Action action, Job job) {
try {
+
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+ taskRequest.getTaskInstanceId());
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
log.info("event received : job:{} action:{}",
job.getMetadata().getName(), action);
if (action == Action.DELETED) {
@@ -222,14 +224,18 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
}
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
+ LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@Override
public void onClose(WatcherException e) {
+
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+ taskRequest.getTaskInstanceId());
log.error("[K8sJobExecutor-{}] fail in k8s: {}",
job.getMetadata().getName(), e.getMessage());
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
countDownLatch.countDown();
+ LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
};
try (Watch watch =
k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) {
@@ -260,10 +266,12 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
String containerName = String.format("%s-%s", taskName,
taskInstanceId);
podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
+
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+ taskRequest.getTaskInstanceId());
+ LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
try (
LogWatch watcher =
ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId(), containerName)) {
-
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(watcher.getOutput()))) {
while ((line = reader.readLine()) != null) {