zhaohehuhu commented on code in PR #15054:
URL: 
https://github.com/apache/dolphinscheduler/pull/15054#discussion_r1446997165


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java:
##########
@@ -193,7 +195,55 @@ public void buildK8sJob(K8sTaskMainParameters 
k8STaskMainParameters) {
                 .build();
 
     }
+    public void registerBatchJobFormer(Job job, String taskInstanceId, 
TaskResponse taskResponse) {
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        ResourceEventHandler resourceEventHandler =  new 
ResourceEventHandler<Job>() {
+            @Override
+            public void onAdd(Job job) {
+                log.info("[K8sJobExecutor-{}] job got added", 
job.getMetadata().getName());
+            }
+            @Override
+            public void onUpdate(Job job, Job t1) {

Review Comment:
   done.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java:
##########
@@ -193,7 +195,55 @@ public void buildK8sJob(K8sTaskMainParameters 
k8STaskMainParameters) {
                 .build();
 
     }
+    public void registerBatchJobFormer(Job job, String taskInstanceId, 
TaskResponse taskResponse) {
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        ResourceEventHandler resourceEventHandler =  new 
ResourceEventHandler<Job>() {
+            @Override
+            public void onAdd(Job job) {
+                log.info("[K8sJobExecutor-{}] job got added", 
job.getMetadata().getName());
+            }
+            @Override
+            public void onUpdate(Job job, Job t1) {
+                try {
+                    
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
+                    log.info("[K8sJobExecutor-{}] job got updated", 
job.getMetadata().getName());
+                    int jobStatus = getK8sJobStatus(job);
+                    log.info("job {} status {}", job.getMetadata().getName(), 
jobStatus);
+                    if (jobStatus == TaskConstants.RUNNING_CODE) {
+                        return;
+                    }
+                        setTaskStatus(jobStatus, taskInstanceId, taskResponse);
+                        countDownLatch.countDown();
+                } finally {
+                    LogUtils.removeTaskInstanceLogFullPathMDC();
+                }
 
+            }
+            @Override
+            public void onDelete(Job job, boolean b) {
+                log.info("[K8sJobExecutor-{}] job got deleted", 
job.getMetadata().getName());
+
+            }
+        };
+        try (SharedIndexInformer sharedIndexInformer = 
k8sUtils.createBatchJobInformer(job, resourceEventHandler)) {
+            boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == 
TaskTimeoutStrategy.FAILED
+                    || taskRequest.getTaskTimeoutStrategy() == 
TaskTimeoutStrategy.WARNFAILED;
+            if (timeoutFlag) {
+                Boolean timeout = 
!(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS));
+                waitTimeout(timeout);
+            } else {
+                countDownLatch.await();
+            }
+        } catch (InterruptedException e) {
+            log.error("job failed in k8s: {}", e.getMessage(), e);
+            Thread.currentThread().interrupt();
+            taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+        } catch (Exception e) {
+            log.error("job failed in k8s: {}", e.getMessage(), e);
+            taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+        }
+    }
+    @Deprecated

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to