This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5764183d4e3 k8s-based-ingestion: Wait for task lifecycles to enter 
RUNNING state before returning from KubernetesTaskRunner.start (#17446)
5764183d4e3 is described below

commit 5764183d4e3c3d8267e98eced05858d5524816c9
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Nov 8 08:13:35 2024 -0800

    k8s-based-ingestion: Wait for task lifecycles to enter RUNNING state before 
returning from KubernetesTaskRunner.start (#17446)
    
    * Add a wait on start() for task lifecycle to go into running
    
    * handle exceptions
    
    * Fix logging messages
    
    * Don't pass in the settable future as a arg
    
    * add some unit tests
---
 docs/development/extensions-contrib/k8s-jobs.md    |   1 +
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  41 ++++++-
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  79 ++++++++++---
 .../k8s/overlord/KubernetesTaskRunnerConfig.java   |  28 ++++-
 .../druid/k8s/overlord/KubernetesWorkItem.java     |  28 ++---
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  |  51 ++++++++-
 .../k8s/overlord/KubernetesTaskRunnerTest.java     | 122 ++++++++++++++++-----
 .../druid/k8s/overlord/KubernetesWorkItemTest.java |  93 ++++++----------
 8 files changed, 311 insertions(+), 132 deletions(-)

diff --git a/docs/development/extensions-contrib/k8s-jobs.md 
b/docs/development/extensions-contrib/k8s-jobs.md
index 913e40b9373..11663642bdd 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -610,6 +610,7 @@ Druid selects the pod template 
`podSpecWithHighMemRequests.yaml`.
 |`druid.indexer.runner.maxTaskDuration`| `Duration`      | Max time a task is 
allowed to run for before getting killed                                        
                                                                                
                                                              |`PT4H`|No|
 |`druid.indexer.runner.taskCleanupDelay`| `Duration`      | How long do jobs 
stay around before getting reaped from K8s                                      
                                                                                
                                                                |`P2D`|No|
 |`druid.indexer.runner.taskCleanupInterval`| `Duration`      | How often to 
check for jobs to be reaped                                                     
                                                                                
                                                                    |`PT10M`|No|
+|`druid.indexer.runner.taskJoinTimeout`| `Duration`      | Timeout for 
gathering metadata about existing tasks on startup                              
                                                                                
                                                                                
           |`PT1M`|No|
 |`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration`      | How long to 
wait to launch a K8s task before marking it as failed, on a resource 
constrained cluster it may take some time.                                      
                                                                                
|`PT1H`|No|
 |`druid.indexer.runner.javaOptsArray`| `JsonArray`     | java opts for the 
task.                                                                           
                                                                                
                                                               |`-Xmx1g`|No|
 |`druid.indexer.runner.labels`| `JsonObject`    | Additional labels you want 
to add to peon pod                                                              
                                                                                
                                                      |`{}`|No|
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 59e1d0f88b3..4dffaf5d0fc 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -22,6 +22,8 @@ package org.apache.druid.k8s.overlord;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodStatus;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -90,6 +92,8 @@ public class KubernetesPeonLifecycle
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
   private final TaskStateListener stateListener;
+  private final SettableFuture<Boolean> taskStartedSuccessfullyFuture;
+
   @MonotonicNonNull
   private LogWatch logWatch;
 
@@ -109,6 +113,7 @@ public class KubernetesPeonLifecycle
     this.taskLogs = taskLogs;
     this.mapper = mapper;
     this.stateListener = stateListener;
+    this.taskStartedSuccessfullyFuture = SettableFuture.create();
   }
 
   /**
@@ -137,11 +142,13 @@ public class KubernetesPeonLifecycle
           launchTimeout,
           TimeUnit.MILLISECONDS
       );
-
       return join(timeout);
     }
     catch (Exception e) {
       log.info("Failed to run task: %s", taskId.getOriginalTaskId());
+      if (!taskStartedSuccessfullyFuture.isDone()) {
+        taskStartedSuccessfullyFuture.set(false);
+      }
       throw e;
     }
     finally {
@@ -179,7 +186,7 @@ public class KubernetesPeonLifecycle
   {
     try {
       updateState(new State[]{State.NOT_STARTED, State.PENDING}, 
State.RUNNING);
-
+      taskStartedSuccessfullyFuture.set(true);
       JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
           taskId,
           timeout,
@@ -188,6 +195,12 @@ public class KubernetesPeonLifecycle
 
       return getTaskStatus(jobResponse.getJobDuration());
     }
+    catch (Exception e) {
+      if (!taskStartedSuccessfullyFuture.isDone()) {
+        taskStartedSuccessfullyFuture.set(false);
+      }
+      throw e;
+    }
     finally {
       try {
         saveLogs();
@@ -195,7 +208,6 @@ public class KubernetesPeonLifecycle
       catch (Exception e) {
         log.warn(e, "Log processing failed for task [%s]", taskId);
       }
-
       stopTask();
     }
   }
@@ -246,7 +258,10 @@ public class KubernetesPeonLifecycle
   protected TaskLocation getTaskLocation()
   {
     if (State.PENDING.equals(state.get()) || 
State.NOT_STARTED.equals(state.get())) {
-      log.debug("Can't get task location for non-running job. [%s]", 
taskId.getOriginalTaskId());
+      /* This should not actually ever happen because 
KubernetesTaskRunner.start() should not return until all running tasks
+      have already gone into State.RUNNING, so getTaskLocation should not be 
called.
+       */
+      log.warn("Can't get task location for non-running job. [%s]", 
taskId.getOriginalTaskId());
       return TaskLocation.unknown();
     }
 
@@ -257,6 +272,10 @@ public class KubernetesPeonLifecycle
     if (taskLocation == null) {
       Optional<Pod> maybePod = 
kubernetesClient.getPeonPod(taskId.getK8sJobName());
       if (!maybePod.isPresent()) {
+        /* Arguably we should throw a exception here but leaving it as a warn 
log to prevent unexpected errors.
+         If there is strange behavior during overlord restarts the operator 
should look for this warn log.
+        */
+        log.warn("Could not get task location from k8s for task [%s].", 
taskId);
         return TaskLocation.unknown();
       }
 
@@ -264,6 +283,7 @@ public class KubernetesPeonLifecycle
       PodStatus podStatus = pod.getStatus();
 
       if (podStatus == null || podStatus.getPodIP() == null) {
+        log.warn("Could not get task location from k8s for task [%s].", 
taskId);
         return TaskLocation.unknown();
       }
       taskLocation = TaskLocation.create(
@@ -376,4 +396,17 @@ public class KubernetesPeonLifecycle
     );
     stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
   }
+
+  /**
+   * Retrieves the current {@link ListenableFuture} representing whether the 
task started successfully
+   *
+   * <p>This future can be used to track whether the task started 
successfully, with a boolean result
+   * indicating success (true) or failure (false) when the task starts.
+   *
+   * @return a {@link ListenableFuture} representing whether the task started 
successfully.
+   */
+  protected ListenableFuture<Boolean> getTaskStartedSuccessfullyFuture()
+  {
+    return taskStartedSuccessfullyFuture;
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index c324b49e13a..deb1f0b3d9c 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -55,12 +56,14 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -146,16 +149,28 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   public ListenableFuture<TaskStatus> run(Task task)
   {
     synchronized (tasks) {
-      return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
-                  .getResult();
+      return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
+          task,
+          exec.submit(() -> runTask(task)),
+          peonLifecycleFactory.build(
+              task,
+              this::emitTaskStateMetrics
+          )
+      )).getResult();
     }
   }
 
-  protected ListenableFuture<TaskStatus> joinAsync(Task task)
+  protected KubernetesWorkItem joinAsync(Task task)
   {
     synchronized (tasks) {
-      return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
-                  .getResult();
+      return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
+          task,
+          exec.submit(() -> joinTask(task)),
+          peonLifecycleFactory.build(
+              task,
+              this::emitTaskStateMetrics
+          )
+      ));
     }
   }
 
@@ -173,10 +188,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   protected TaskStatus doTask(Task task, boolean run)
   {
     try {
-      KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
-          task,
-          this::emitTaskStateMetrics
-      );
+      KubernetesPeonLifecycle peonLifecycle;
 
       synchronized (tasks) {
         KubernetesWorkItem workItem = tasks.get(task.getId());
@@ -185,7 +197,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
           throw new ISE("Task [%s] has been shut down", task.getId());
         }
 
-        workItem.setKubernetesPeonLifecycle(peonLifecycle);
+        peonLifecycle = workItem.getPeonLifeycle();
       }
 
       TaskStatus taskStatus;
@@ -321,16 +333,53 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   public void start()
   {
     log.info("Starting K8sTaskRunner...");
-    // Load tasks from previously running jobs and wait for their statuses to 
be updated asynchronously.
-    for (Job job : client.getPeonJobs()) {
+    // Load tasks from previously running jobs and wait for their statuses to 
start running.
+    final List<ListenableFuture<Boolean>> taskStatusActiveList = new 
ArrayList<>();
+    final List<Job> peonJobs = client.getPeonJobs();
+
+    log.info("Locating [%,d] active tasks.", peonJobs.size());
+    for (Job job : peonJobs) {
       try {
-        joinAsync(adapter.toTask(job));
+        KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job));
+        
taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture());
       }
       catch (IOException e) {
         log.error(e, "Error deserializing task from job [%s]", 
job.getMetadata().getName());
       }
     }
-    log.info("Loaded %,d tasks from previous run", tasks.size());
+
+    try {
+      final DateTime nowUtc = DateTimes.nowUtc();
+      final long timeoutMs = 
nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis();
+      if (timeoutMs > 0) {
+        FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs, 
TimeUnit.MILLISECONDS);
+      }
+      log.info("Located [%,d] active tasks.", taskStatusActiveList.size());
+    }
+    catch (Exception e) {
+      final long numInitialized =
+          tasks.values()
+              .stream()
+              .filter(item -> {
+                if 
(item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) {
+                  try {
+                    return 
item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get();
+                  }
+                  catch (InterruptedException | ExecutionException ex) {
+                    return false;
+                  }
+                } else {
+                  return false;
+                }
+              }).count();
+      log.warn(
+          e,
+          "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating 
others asynchronously.",
+          numInitialized,
+          taskStatusActiveList.size(),
+          config.getTaskJoinTimeout()
+      );
+    }
 
     cleanupExecutor.scheduleAtFixedRate(
         () ->
@@ -342,7 +391,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         config.getTaskCleanupInterval().toStandardDuration().getMillis(),
         TimeUnit.MILLISECONDS
     );
-    log.debug("Started cleanup executor for jobs older than %s...", 
config.getTaskCleanupDelay());
+    log.info("Started cleanup executor for jobs older than %s...", 
config.getTaskCleanupDelay());
   }
 
   @Override
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 60efa3c4856..106378f57aa 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig
   // interval for k8s job cleanup to run
   private Period taskCleanupInterval = new Period("PT10m");
 
+  @JsonProperty
+  @NotNull
+  // how long to wait to join peon k8s jobs on startup
+  private Period taskJoinTimeout = new Period("PT1M");
+
+
   @JsonProperty
   @NotNull
   // how long to wait for the peon k8s job to launch
@@ -140,7 +146,8 @@ public class KubernetesTaskRunnerConfig
       int cpuCoreInMicro,
       Map<String, String> labels,
       Map<String, String> annotations,
-      Integer capacity
+      Integer capacity,
+      Period taskJoinTimeout
   )
   {
     this.namespace = namespace;
@@ -181,6 +188,10 @@ public class KubernetesTaskRunnerConfig
         k8sjobLaunchTimeout,
         this.k8sjobLaunchTimeout
     );
+    this.taskJoinTimeout = ObjectUtils.defaultIfNull(
+        taskJoinTimeout,
+        this.taskJoinTimeout
+    );
     this.peonMonitors = ObjectUtils.defaultIfNull(
         peonMonitors,
         this.peonMonitors
@@ -247,6 +258,11 @@ public class KubernetesTaskRunnerConfig
   {
     return maxTaskDuration;
   }
+  public Period getTaskJoinTimeout()
+  {
+    return taskJoinTimeout;
+  }
+
 
   public Period getTaskCleanupDelay()
   {
@@ -317,6 +333,7 @@ public class KubernetesTaskRunnerConfig
     private Map<String, String> labels;
     private Map<String, String> annotations;
     private Integer capacity;
+    private Period taskJoinTimeout;
 
     public Builder()
     {
@@ -425,6 +442,12 @@ public class KubernetesTaskRunnerConfig
       return this;
     }
 
+    public Builder withTaskJoinTimeout(Period taskJoinTimeout)
+    {
+      this.taskJoinTimeout = taskJoinTimeout;
+      return this;
+    }
+
     public KubernetesTaskRunnerConfig build()
     {
       return new KubernetesTaskRunnerConfig(
@@ -444,7 +467,8 @@ public class KubernetesTaskRunnerConfig
           this.cpuCoreInMicro,
           this.labels,
           this.annotations,
-          this.capacity
+          this.capacity,
+          this.taskJoinTimeout
       );
     }
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index 94d4bbb67f6..5eb55b097b4 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -20,7 +20,6 @@
 package org.apache.druid.k8s.overlord;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
@@ -34,27 +33,19 @@ import java.io.InputStream;
 public class KubernetesWorkItem extends TaskRunnerWorkItem
 {
   private final Task task;
-  private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
+  private final KubernetesPeonLifecycle kubernetesPeonLifecycle;
 
-  public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> 
statusFuture)
+  public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> 
statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle)
   {
     super(task.getId(), statusFuture);
     this.task = task;
-  }
-
-  protected synchronized void 
setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle)
-  {
-    Preconditions.checkState(this.kubernetesPeonLifecycle == null);
     this.kubernetesPeonLifecycle = kubernetesPeonLifecycle;
   }
 
   protected synchronized void shutdown()
   {
-
-    if (this.kubernetesPeonLifecycle != null) {
-      this.kubernetesPeonLifecycle.startWatchingLogs();
-      this.kubernetesPeonLifecycle.shutdown();
-    }
+    this.kubernetesPeonLifecycle.startWatchingLogs();
+    this.kubernetesPeonLifecycle.shutdown();
   }
 
   protected boolean isPending()
@@ -88,18 +79,12 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
 
   protected Optional<InputStream> streamTaskLogs()
   {
-    if (kubernetesPeonLifecycle == null) {
-      return Optional.absent();
-    }
     return kubernetesPeonLifecycle.streamLogs();
   }
 
   @Override
   public TaskLocation getLocation()
   {
-    if (kubernetesPeonLifecycle == null) {
-      return TaskLocation.unknown();
-    }
     return kubernetesPeonLifecycle.getTaskLocation();
   }
 
@@ -119,4 +104,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
   {
     return task;
   }
+
+  protected KubernetesPeonLifecycle getPeonLifeycle()
+  {
+    return this.kubernetesPeonLifecycle;
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 13ad4fda209..85ea7072b85 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -52,6 +52,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -264,6 +265,53 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, 
peonLifecycle.getState());
   }
 
+  @Test
+  public void test_run_whenExceptionRaised_setsStartStatusFutureToFalse() 
throws ExecutionException, InterruptedException
+  {
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    )
+    {
+      @Override
+      protected synchronized TaskStatus join(long timeout)
+      {
+        throw new IllegalStateException();
+      }
+    };
+
+    Job job = new 
JobBuilder().withNewMetadata().withName(ID).endMetadata().build();
+
+    EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
+        EasyMock.eq(job),
+        EasyMock.eq(task),
+        EasyMock.anyLong(),
+        EasyMock.eq(TimeUnit.MILLISECONDS)
+    )).andReturn(null);
+    Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
+
+    replayAll();
+
+    Assert.assertThrows(
+        Exception.class,
+        () -> peonLifecycle.run(job, 0L, 0L, false)
+    );
+
+    verifyAll();
+
+    Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, 
peonLifecycle.getState());
+    
Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone());
+    Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().get());
+
+  }
+
   @Test
   public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
   {
@@ -313,6 +361,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         stateListener
     );
 
+    
Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone());
     Job job = new JobBuilder()
         .withNewMetadata()
         .withName(ID)
@@ -347,7 +396,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     TaskStatus taskStatus = peonLifecycle.join(0L);
 
     verifyAll();
-
+    
Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone());
     Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus);
     Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, 
peonLifecycle.getState());
   }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 67a5278c6a3..a6c01ee306a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import org.apache.commons.io.IOUtils;
@@ -103,6 +104,8 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_start_withExistingJobs() throws IOException
   {
+    SettableFuture<Boolean> settableFuture = SettableFuture.create();
+    settableFuture.set(true);
     KubernetesTaskRunner runner = new KubernetesTaskRunner(
         taskAdapter,
         config,
@@ -113,15 +116,16 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     )
     {
       @Override
-      protected ListenableFuture<TaskStatus> joinAsync(Task task)
+      protected KubernetesWorkItem joinAsync(Task task)
       {
         return tasks.computeIfAbsent(
             task.getId(),
             k -> new KubernetesWorkItem(
                 task,
-                Futures.immediateFuture(TaskStatus.success(task.getId()))
+                Futures.immediateFuture(TaskStatus.success(task.getId())),
+                kubernetesPeonLifecycle
             )
-        ).getResult();
+        );
       }
     };
 
@@ -133,6 +137,67 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
     EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+    
EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn(
+        settableFuture
+    );
+
+    replayAll();
+
+    runner.start();
+
+    verifyAll();
+
+    Assert.assertNotNull(runner.tasks);
+    Assert.assertEquals(1, runner.tasks.size());
+  }
+
+  @Test
+  public void test_start_withExistingJobs_oneJobFails() throws IOException
+  {
+    SettableFuture<Boolean> settableFuture = SettableFuture.create();
+    settableFuture.set(true);
+    KubernetesTaskRunner runner = new KubernetesTaskRunner(
+        taskAdapter,
+        config,
+        peonClient,
+        httpClient,
+        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+        emitter
+    )
+    {
+      @Override
+      protected KubernetesWorkItem joinAsync(Task task)
+      {
+        return tasks.computeIfAbsent(
+            task.getId(),
+            k -> new KubernetesWorkItem(
+                task,
+                Futures.immediateFuture(TaskStatus.success(task.getId())),
+                kubernetesPeonLifecycle
+            )
+        );
+      }
+    };
+
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(ID)
+        .endMetadata()
+        .build();
+
+    Job job2 = new JobBuilder()
+        .withNewMetadata()
+        .withName("id2")
+        .endMetadata()
+        .build();
+
+    EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job, 
job2));
+    EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+    EasyMock.expect(taskAdapter.toTask(job2)).andThrow(new 
IOException("deserialization exception"));
+
+    
EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn(
+        settableFuture
+    );
 
     replayAll();
 
@@ -157,10 +222,9 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     )
     {
       @Override
-      protected ListenableFuture<TaskStatus> joinAsync(Task task)
+      protected KubernetesWorkItem joinAsync(Task task)
       {
-        return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, null))
-                    .getResult();
+        return tasks.computeIfAbsent(task.getId(), k -> new 
KubernetesWorkItem(task, null, kubernetesPeonLifecycle));
       }
     };
 
@@ -193,7 +257,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_streamTaskLog_withExistingTask() throws IOException
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null)
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle)
     {
       @Override
       protected Optional<InputStream> streamTaskLogs()
@@ -241,7 +305,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_run_withExistingTask_returnsExistingWorkItem()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle);
     runner.tasks.put(task.getId(), workItem);
 
     ListenableFuture<TaskStatus> future = runner.run(task);
@@ -286,8 +350,8 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     replayAll();
 
-    ListenableFuture<TaskStatus> future = runner.joinAsync(task);
-    Assert.assertEquals(taskStatus, future.get());
+    KubernetesWorkItem workItem = runner.joinAsync(task);
+    Assert.assertEquals(taskStatus, workItem.getResult().get());
 
     verifyAll();
   }
@@ -295,7 +359,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_join_withExistingTask_returnsExistingWorkItem()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle);
     runner.tasks.put(task.getId(), workItem);
 
     ListenableFuture<TaskStatus> future = runner.run(task);
@@ -310,9 +374,9 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     replayAll();
 
-    ListenableFuture<TaskStatus> future = runner.joinAsync(task);
+    KubernetesWorkItem workItem = runner.joinAsync(task);
 
-    Exception e = Assert.assertThrows(ExecutionException.class, future::get);
+    Exception e = Assert.assertThrows(ExecutionException.class, () -> 
workItem.getResult().get());
     Assert.assertTrue(e.getCause() instanceof RuntimeException);
 
     verifyAll();
@@ -331,7 +395,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_shutdown_withExistingTask_removesTaskFromMap()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, 
kubernetesPeonLifecycle) {
       @Override
       protected synchronized void shutdown()
       {
@@ -348,7 +412,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void 
test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, 
kubernetesPeonLifecycle) {
       @Override
       protected synchronized void shutdown()
       {
@@ -385,7 +449,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_getKnownTasks()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle);
 
     runner.tasks.put(task.getId(), workItem);
 
@@ -399,7 +463,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   public void test_getRunningTasks()
   {
     Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
-    KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, 
null) {
+    KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, 
null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -409,7 +473,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     runner.tasks.put(pendingTask.getId(), pendingWorkItem);
 
     Task runningTask = K8sTestUtils.createTask("running-id", 0);
-    KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, 
null) {
+    KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, 
null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -428,7 +492,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   public void test_getPendingTasks()
   {
     Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
-    KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, 
null) {
+    KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, 
null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -438,7 +502,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     runner.tasks.put(pendingTask.getId(), pendingWorkItem);
 
     Task runningTask = K8sTestUtils.createTask("running-id", 0);
-    KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, 
null) {
+    KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, 
null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -462,7 +526,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_getRunnerTaskState_withExistingTask()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -477,7 +541,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_streamTaskReports_withExistingTask() throws Exception
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -512,7 +576,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void 
test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws 
Exception
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -529,7 +593,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void 
test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -593,7 +657,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void 
test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -618,7 +682,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_metricsReported_whenTaskStateChange()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -640,7 +704,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_getTaskLocation_withExistingTask()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -657,7 +721,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_getTaskLocation_throws()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null)
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle)
     {
       @Override
       public TaskLocation getLocation()
@@ -689,7 +753,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   public void test_getUsedCapacity()
   {
     Assert.assertEquals(0, runner.getUsedCapacity());
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle);
     runner.tasks.put(task.getId(), workItem);
     Assert.assertEquals(1, runner.getUsedCapacity());
     runner.tasks.remove(task.getId());
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index 7d17193b171..fe2b576bccb 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -45,36 +45,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   public void setup()
   {
     task = NoopTask.create();
-    workItem = new KubernetesWorkItem(task, null);
-  }
-
-  @Test
-  public void 
test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException()
-  {
-    workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
-        task,
-        null,
-        null,
-        null,
-        null
-    ));
-
-    Assert.assertThrows(
-        IllegalStateException.class,
-        () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
-            task,
-            null,
-            null,
-            null,
-            null
-        ))
-    );
-  }
-
-  @Test
-  public void test_shutdown_withoutKubernetesPeonLifecycle()
-  {
-    workItem.shutdown();
   }
 
   @Test
@@ -86,7 +56,11 @@ public class KubernetesWorkItemTest extends EasyMockSupport
     EasyMock.expectLastCall();
 
     replayAll();
-    workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
+    workItem = new KubernetesWorkItem(
+        task,
+        null,
+        kubernetesPeonLifecycle
+    );
 
     workItem.shutdown();
     verifyAll();
@@ -95,7 +69,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   @Test
   public void test_isPending_withTaskStateWaiting_returnsFalse()
   {
-    workItem = new KubernetesWorkItem(task, null) {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -108,7 +82,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   @Test
   public void test_isPending_withTaskStatePending_returnsTrue()
   {
-    workItem = new KubernetesWorkItem(task, null) {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -121,7 +95,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   @Test
   public void test_isRunning_withTaskStateWaiting_returnsFalse()
   {
-    workItem = new KubernetesWorkItem(task, null) {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -134,7 +108,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   @Test
   public void test_isRunning_withTaskStatePending_returnsTrue()
   {
-    workItem = new KubernetesWorkItem(task, null) {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
       @Override
       protected RunnerTaskState getRunnerTaskState()
       {
@@ -144,22 +118,17 @@ public class KubernetesWorkItemTest extends 
EasyMockSupport
     Assert.assertTrue(workItem.isRunning());
   }
 
-  @Test
-  public void 
test_getRunnerTaskState_withoutKubernetesPeonLifecycle_returnsPending()
-  {
-    Assert.assertEquals(RunnerTaskState.PENDING, 
workItem.getRunnerTaskState());
-  }
-
   @Test
   public void 
test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending()
   {
-    workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
         task,
         null,
         null,
         null,
         null
-    ));
+    );
+    workItem = new KubernetesWorkItem(task, null, peonLifecycle);
 
     Assert.assertEquals(RunnerTaskState.PENDING, 
workItem.getRunnerTaskState());
   }
@@ -181,7 +150,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
       }
     };
 
-    workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    workItem = new KubernetesWorkItem(task, null, peonLifecycle);
 
     Assert.assertEquals(RunnerTaskState.PENDING, 
workItem.getRunnerTaskState());
   }
@@ -203,7 +172,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
       }
     };
 
-    workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    workItem = new KubernetesWorkItem(task, null, peonLifecycle);
 
     Assert.assertEquals(RunnerTaskState.RUNNING, 
workItem.getRunnerTaskState());
   }
@@ -225,46 +194,36 @@ public class KubernetesWorkItemTest extends 
EasyMockSupport
       }
     };
 
-    workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    workItem = new KubernetesWorkItem(task, null, peonLifecycle);
 
     Assert.assertEquals(RunnerTaskState.NONE, workItem.getRunnerTaskState());
   }
 
-  @Test
-  public void test_streamTaskLogs_withoutKubernetesPeonLifecycle()
-  {
-    Assert.assertFalse(workItem.streamTaskLogs().isPresent());
-  }
-
   @Test
   public void test_streamTaskLogs_withKubernetesPeonLifecycle()
   {
-    workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
         task,
         null,
         null,
         null,
         null
-    ));
+    );
+    workItem = new KubernetesWorkItem(task, null, peonLifecycle);
     Assert.assertFalse(workItem.streamTaskLogs().isPresent());
   }
 
-  @Test
-  public void test_getLocation_withoutKubernetesPeonLifecycle()
-  {
-    Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation());
-  }
-
   @Test
   public void test_getLocation_withKubernetesPeonLifecycle()
   {
-    workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
         task,
         null,
         null,
         null,
         null
-    ));
+    );
+    workItem = new KubernetesWorkItem(task, null, peonLifecycle);
 
     Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation());
   }
@@ -272,18 +231,28 @@ public class KubernetesWorkItemTest extends 
EasyMockSupport
   @Test
   public void test_getTaskType()
   {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
     Assert.assertEquals(task.getType(), workItem.getTaskType());
   }
 
   @Test
   public void test_getDataSource()
   {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
     Assert.assertEquals(task.getDataSource(), workItem.getDataSource());
   }
 
   @Test
   public void test_getTask()
   {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
     Assert.assertEquals(task, workItem.getTask());
   }
+
+  @Test
+  public void test_peonLifeycle()
+  {
+    workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
+    Assert.assertEquals(kubernetesPeonLifecycle, workItem.getPeonLifeycle());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to