georgew5656 commented on code in PR #14156:
URL: https://github.com/apache/druid/pull/14156#discussion_r1186294614


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
 
 /**
  * Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of 
truth, so if you launch a task
- * shutdown druid, bring up druid, the task will keep running and the state 
will be updated when the cluster
- * comes back.  Thus while no tasks are technically restorable, all tasks once 
launched will run in isolation to the
- * extent possible without requiring the overlord consistently up during their 
lifetime.
  */
-
 public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
 {
-
   private static final EmittingLogger log = new 
EmittingLogger(KubernetesTaskRunner.class);
   private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> 
listeners = new CopyOnWriteArrayList<>();
 
   // to cleanup old jobs that might not have been deleted.
   private final ScheduledExecutorService cleanupExecutor;
 
-  protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new 
ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, KubernetesWorkItem> tasks = new 
ConcurrentHashMap<>();
   protected final TaskAdapter adapter;
-  protected final KubernetesPeonClient client;
 
-  private final ObjectMapper mapper;
-  private final KubernetesTaskRunnerConfig k8sConfig;
-  private final TaskQueueConfig taskQueueConfig;
-  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient client;
+  private final KubernetesTaskRunnerConfig config;
   private final ListeningExecutorService exec;
   private final HttpClient httpClient;
+  private final PeonLifecycleFactory peonLifecycleFactory;
 
 
   public KubernetesTaskRunner(
-      ObjectMapper mapper,
       TaskAdapter adapter,
-      KubernetesTaskRunnerConfig k8sConfig,
-      TaskQueueConfig taskQueueConfig,
-      TaskLogs taskLogs,
+      KubernetesTaskRunnerConfig config,
       KubernetesPeonClient client,
-      HttpClient httpClient
+      HttpClient httpClient,
+      PeonLifecycleFactory peonLifecycleFactory
   )
   {
-    this.mapper = mapper;
     this.adapter = adapter;
-    this.k8sConfig = k8sConfig;
-    this.taskQueueConfig = taskQueueConfig;
-    this.taskLogs = taskLogs;
+    this.config = config;
     this.client = client;
     this.httpClient = httpClient;
+    this.peonLifecycleFactory = peonLifecycleFactory;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
     this.exec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
-    );
-    Preconditions.checkArgument(
-        taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
-        "The task queue bounds how many concurrent k8s tasks you can have"
+        Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
     );
   }
 
-
   @Override
   public Optional<InputStream> streamTaskLog(String taskid, long offset)
   {
-    return client.getPeonLogs(new K8sTaskId(taskid));
+    KubernetesWorkItem workItem = tasks.get(taskid);
+    if (workItem == null) {
+      return Optional.absent();
+    }
+    return workItem.streamTaskLogs();
   }
 
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
     synchronized (tasks) {
-      tasks.computeIfAbsent(
-          task.getId(), k -> new K8sWorkItem(
-              client,
-              task,
-              exec.submit(() -> {
-                K8sTaskId k8sTaskId = new K8sTaskId(task);
-                try {
-                  JobResponse completedPhase;
-                  Optional<Job> existingJob = client.jobExists(k8sTaskId);
-                  if (!existingJob.isPresent()) {
-                    Job job = adapter.fromTask(task);
-                    log.info("Job created %s and ready to launch", k8sTaskId);
-                    Pod peonPod = client.launchJobAndWaitForStart(
-                        job,
-                        
KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.k8sjobLaunchTimeout),
-                        TimeUnit.MILLISECONDS
-                    );
-                    log.info("Job %s launched in k8s", k8sTaskId);
-                    completedPhase = monitorJob(peonPod, k8sTaskId);
-                  } else {
-                    Job job = existingJob.get();
-                    if (job.getStatus().getActive() == null) {
-                      if (job.getStatus().getSucceeded() != null) {
-                        completedPhase = new JobResponse(job, 
PeonPhase.SUCCEEDED);
-                      } else {
-                        completedPhase = new JobResponse(job, 
PeonPhase.FAILED);
-                      }
-                    } else {
-                      // the job is active lets monitor it
-                      completedPhase = monitorJob(k8sTaskId);
-                    }
-                  }
-                  TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
-                  if (completedPhase.getJobDuration().isPresent()) {
-                    status = 
status.withDuration(completedPhase.getJobDuration().get());
-                  }
-                  updateStatus(task, status);
-                  return status;
-                }
-                catch (Exception e) {
-                  log.error(e, "Error with task: %s", k8sTaskId);
-                  throw e;
-                }
-                finally {
-                  // publish task logs
-                  Path log = Files.createTempFile(task.getId(), "log");
-                  try {
-                    Optional<InputStream> logStream = client.getPeonLogs(new 
K8sTaskId(task.getId()));
-                    if (logStream.isPresent()) {
-                      FileUtils.copyInputStreamToFile(logStream.get(), 
log.toFile());
-                    }
-                    taskLogs.pushTaskLog(task.getId(), log.toFile());
-                  }
-                  finally {
-                    Files.deleteIfExists(log);
-                  }
-                  client.cleanUpJob(new K8sTaskId(task.getId()));
-                  synchronized (tasks) {
-                    tasks.remove(task.getId());
-                  }
-                }
-              })
-          ));
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, 
exec.submit(() -> runTask(task))));
+      return tasks.get(task.getId()).getResult();
+    }
+  }
+
+  protected ListenableFuture<TaskStatus> join(Task task)
+  {
+    synchronized (tasks) {
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, 
exec.submit(() -> joinTask(task))));
       return tasks.get(task.getId()).getResult();
     }
   }
 
-  JobResponse monitorJob(K8sTaskId k8sTaskId)
+  private TaskStatus runTask(Task task)
   {
-    return monitorJob(client.getMainJobPod(k8sTaskId), k8sTaskId);
+    return doTask(task, true);
   }
 
-  JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
+  private TaskStatus joinTask(Task task)
   {
-    if (peonPod == null) {
-      throw new ISE("Error in k8s launching peon pod for task %s", k8sTaskId);
-    }
-    return client.waitForJobCompletion(
-        k8sTaskId,
-        KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.maxTaskDuration),
-        TimeUnit.MILLISECONDS
-    );
+    return doTask(task, false);
   }
 
-  private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) 
throws IOException
+  @VisibleForTesting
+  protected TaskStatus doTask(Task task, boolean run)
   {
-    Optional<InputStream> maybeTaskStatusStream = 
taskLogs.streamTaskStatus(task.getOriginalTaskId());
-    if (maybeTaskStatusStream.isPresent()) {
-      String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), 
StandardCharsets.UTF_8);
-      return mapper.readValue(taskStatus, TaskStatus.class);
-    } else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
-      // fallback to behavior before the introduction of task status streaming 
for backwards compatibility
-      return TaskStatus.success(task.getOriginalTaskId());
-    } else if (Objects.isNull(jobResponse.getJob())) {
-      return TaskStatus.failure(
-          task.getOriginalTaskId(),
-          StringUtils.format("Task [%s] failed kubernetes job disappeared 
before completion", task.getOriginalTaskId())
-      );
-    } else {
-      return TaskStatus.failure(
-          task.getOriginalTaskId(),
-          StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
-      );
+    KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+
+    synchronized (tasks) {
+      KubernetesWorkItem workItem = tasks.get(task.getId());
+
+      if (workItem == null) {
+        throw new ISE("Task [%s] disappeared", task.getId());
+      }
+
+      if (workItem.isShutdownRequested()) {
+        throw new ISE("Task [%s] has been shut down", task.getId());
+      }
+
+      workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    }
+
+    try {
+      TaskStatus taskStatus;
+      if (run) {
+        taskStatus = peonLifecycle.run(
+            adapter.fromTask(task),
+            config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
+            config.getTaskTimeout().toStandardDuration().getMillis()
+        );
+      } else {
+        taskStatus = peonLifecycle.join(
+            config.getTaskTimeout().toStandardDuration().getMillis()
+        );
+      }
+
+      updateStatus(task, taskStatus);
+
+      return taskStatus;
+    }
+
+    catch (Exception e) {
+      log.error(e, "Task [%s] execution caught an exception", task.getId());

Review Comment:
   In the join method of the KubernetesPeonLifecycle class, there's a finally 
block that cleans up the K8s Job. this will get run even if the main try logic 
throws an error.
   
   
   if there's some problem with this logic in the lifecycle class (like k8s 
being offline or something), there is a cleanup executor that delete jobs that 
have been around for a while
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to