This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new be3f93e3cf Restore tasks when lifecycle start (#14909)
be3f93e3cf is described below

commit be3f93e3cf0c9f10292d1d5349b24a9402dd38de
Author: YongGang <[email protected]>
AuthorDate: Fri Sep 22 12:03:34 2023 -0700

    Restore tasks when lifecycle start (#14909)
    
    * K8s tasks restore should be from lifecycle start
    
    * add test
    
    * add more tests
    
    * fix test
    
    * wait tasks restore finish when start
    
    * fix style
    
    * revert previous change and add comment
---
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  23 +--
 .../k8s/overlord/KubernetesTaskRunnerTest.java     | 159 +++++++++++----------
 2 files changed, 95 insertions(+), 87 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 14fe54778a..33efd848d0 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -23,6 +23,7 @@ import com.google.api.client.util.Lists;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -60,7 +61,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -309,23 +309,25 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
   {
-    List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new 
ArrayList<>();
+    return ImmutableList.of();
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Starting K8sTaskRunner...");
+    // Load tasks from previously running jobs and wait for their statuses to 
be updated asynchronously.
     for (Job job : client.getPeonJobs()) {
       try {
-        Task task = adapter.toTask(job);
-        restoredTasks.add(Pair.of(task, joinAsync(task)));
+        joinAsync(adapter.toTask(job));
       }
       catch (IOException e) {
         log.error(e, "Error deserializing task from job [%s]", 
job.getMetadata().getName());
       }
     }
-    return restoredTasks;
-  }
+    log.info("Loaded %,d tasks from previous run", tasks.size());
 
-  @Override
-  @LifecycleStart
-  public void start()
-  {
     cleanupExecutor.scheduleAtFixedRate(
         () ->
             client.deleteCompletedPeonJobsOlderThan(
@@ -339,7 +341,6 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     log.debug("Started cleanup executor for jobs older than %s...", 
config.getTaskCleanupDelay());
   }
 
-
   @Override
   @LifecycleStop
   public void stop()
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 11e7a75af6..e6b1b8006a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -31,7 +31,6 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.java.util.http.client.HttpClient;
@@ -56,7 +55,6 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -101,6 +99,89 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     );
   }
 
+  @Test
+  public void test_start_withExistingJobs() throws IOException
+  {
+    KubernetesTaskRunner runner = new KubernetesTaskRunner(
+        taskAdapter,
+        config,
+        peonClient,
+        httpClient,
+        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+        emitter
+    )
+    {
+      @Override
+      protected ListenableFuture<TaskStatus> joinAsync(Task task)
+      {
+        return tasks.computeIfAbsent(
+            task.getId(),
+            k -> new KubernetesWorkItem(
+                task,
+                Futures.immediateFuture(TaskStatus.success(task.getId()))
+            )
+        ).getResult();
+      }
+    };
+
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(ID)
+        .endMetadata()
+        .build();
+
+    EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
+    EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+
+    replayAll();
+
+    runner.start();
+
+    verifyAll();
+
+    Assert.assertNotNull(runner.tasks);
+    Assert.assertEquals(1, runner.tasks.size());
+  }
+
+  @Test
+  public void test_start_whenDeserializationExceptionThrown_isIgnored() throws 
IOException
+  {
+    KubernetesTaskRunner runner = new KubernetesTaskRunner(
+        taskAdapter,
+        config,
+        peonClient,
+        httpClient,
+        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+        emitter
+    )
+    {
+      @Override
+      protected ListenableFuture<TaskStatus> joinAsync(Task task)
+      {
+        return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, null))
+                    .getResult();
+      }
+    };
+
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(ID)
+        .endMetadata()
+        .build();
+
+    EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
+    EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
+
+    replayAll();
+
+    runner.start();
+
+    verifyAll();
+
+    Assert.assertNotNull(runner.tasks);
+    Assert.assertEquals(0, runner.tasks.size());
+  }
+
   @Test
   public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional()
   {
@@ -263,80 +344,6 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     runner.shutdown(task.getId(), "");
   }
 
-  @Test
-  public void test_restore_withExistingJobs() throws IOException
-  {
-    KubernetesTaskRunner runner = new KubernetesTaskRunner(
-        taskAdapter,
-        config,
-        peonClient,
-        httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
-    ) {
-      @Override
-      protected ListenableFuture<TaskStatus> joinAsync(Task task)
-      {
-        return new KubernetesWorkItem(task, null).getResult();
-      }
-    };
-
-    Job job = new JobBuilder()
-        .withNewMetadata()
-        .withName(ID)
-        .endMetadata()
-        .build();
-
-    EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
-    EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
-
-    replayAll();
-
-    List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();
-
-    verifyAll();
-
-    Assert.assertNotNull(tasks);
-    Assert.assertEquals(1, tasks.size());
-  }
-
-  @Test
-  public void test_restore_whenDeserializationExceptionThrown_isIgnored() 
throws IOException
-  {
-    KubernetesTaskRunner runner = new KubernetesTaskRunner(
-        taskAdapter,
-        config,
-        peonClient,
-        httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
-    ) {
-      @Override
-      protected ListenableFuture<TaskStatus> joinAsync(Task task)
-      {
-        return new KubernetesWorkItem(task, null).getResult();
-      }
-    };
-
-    Job job = new JobBuilder()
-        .withNewMetadata()
-        .withName(ID)
-        .endMetadata()
-        .build();
-
-    EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
-    EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
-
-    replayAll();
-
-    List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();
-
-    verifyAll();
-
-    Assert.assertNotNull(tasks);
-    Assert.assertEquals(0, tasks.size());
-  }
-
   @Test
   public void test_getTotalTaskSlotCount()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to