This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5764183d4e3 k8s-based-ingestion: Wait for task lifecycles to enter
RUNNING state before returning from KubernetesTaskRunner.start (#17446)
5764183d4e3 is described below
commit 5764183d4e3c3d8267e98eced05858d5524816c9
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Nov 8 08:13:35 2024 -0800
k8s-based-ingestion: Wait for task lifecycles to enter RUNNING state before
returning from KubernetesTaskRunner.start (#17446)
* Add a wait on start() for task lifecycle to go into running
* handle exceptions
* Fix logging messages
* Don't pass in the settable future as a arg
* add some unit tests
---
docs/development/extensions-contrib/k8s-jobs.md | 1 +
.../k8s/overlord/KubernetesPeonLifecycle.java | 41 ++++++-
.../druid/k8s/overlord/KubernetesTaskRunner.java | 79 ++++++++++---
.../k8s/overlord/KubernetesTaskRunnerConfig.java | 28 ++++-
.../druid/k8s/overlord/KubernetesWorkItem.java | 28 ++---
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 51 ++++++++-
.../k8s/overlord/KubernetesTaskRunnerTest.java | 122 ++++++++++++++++-----
.../druid/k8s/overlord/KubernetesWorkItemTest.java | 93 ++++++----------
8 files changed, 311 insertions(+), 132 deletions(-)
diff --git a/docs/development/extensions-contrib/k8s-jobs.md
b/docs/development/extensions-contrib/k8s-jobs.md
index 913e40b9373..11663642bdd 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -610,6 +610,7 @@ Druid selects the pod template
`podSpecWithHighMemRequests.yaml`.
|`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is
allowed to run for before getting killed
|`PT4H`|No|
|`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs
stay around before getting reaped from K8s
|`P2D`|No|
|`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to
check for jobs to be reaped
|`PT10M`|No|
+|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for
gathering metadata about existing tasks on startup
|`PT1M`|No|
|`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to
wait to launch a K8s task before marking it as failed, on a resource
constrained cluster it may take some time.
|`PT1H`|No|
|`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the
task.
|`-Xmx1g`|No|
|`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want
to add to peon pod
|`{}`|No|
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 59e1d0f88b3..4dffaf5d0fc 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -22,6 +22,8 @@ package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -90,6 +92,8 @@ public class KubernetesPeonLifecycle
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
+ private final SettableFuture<Boolean> taskStartedSuccessfullyFuture;
+
@MonotonicNonNull
private LogWatch logWatch;
@@ -109,6 +113,7 @@ public class KubernetesPeonLifecycle
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
+ this.taskStartedSuccessfullyFuture = SettableFuture.create();
}
/**
@@ -137,11 +142,13 @@ public class KubernetesPeonLifecycle
launchTimeout,
TimeUnit.MILLISECONDS
);
-
return join(timeout);
}
catch (Exception e) {
log.info("Failed to run task: %s", taskId.getOriginalTaskId());
+ if (!taskStartedSuccessfullyFuture.isDone()) {
+ taskStartedSuccessfullyFuture.set(false);
+ }
throw e;
}
finally {
@@ -179,7 +186,7 @@ public class KubernetesPeonLifecycle
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING},
State.RUNNING);
-
+ taskStartedSuccessfullyFuture.set(true);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
@@ -188,6 +195,12 @@ public class KubernetesPeonLifecycle
return getTaskStatus(jobResponse.getJobDuration());
}
+ catch (Exception e) {
+ if (!taskStartedSuccessfullyFuture.isDone()) {
+ taskStartedSuccessfullyFuture.set(false);
+ }
+ throw e;
+ }
finally {
try {
saveLogs();
@@ -195,7 +208,6 @@ public class KubernetesPeonLifecycle
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
}
-
stopTask();
}
}
@@ -246,7 +258,10 @@ public class KubernetesPeonLifecycle
protected TaskLocation getTaskLocation()
{
if (State.PENDING.equals(state.get()) ||
State.NOT_STARTED.equals(state.get())) {
- log.debug("Can't get task location for non-running job. [%s]",
taskId.getOriginalTaskId());
+ /* This should not actually ever happen because
KubernetesTaskRunner.start() should not return until all running tasks
+ have already gone into State.RUNNING, so getTaskLocation should not be
called.
+ */
+ log.warn("Can't get task location for non-running job. [%s]",
taskId.getOriginalTaskId());
return TaskLocation.unknown();
}
@@ -257,6 +272,10 @@ public class KubernetesPeonLifecycle
if (taskLocation == null) {
Optional<Pod> maybePod =
kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
+ /* Arguably we should throw a exception here but leaving it as a warn
log to prevent unexpected errors.
+ If there is strange behavior during overlord restarts the operator
should look for this warn log.
+ */
+ log.warn("Could not get task location from k8s for task [%s].",
taskId);
return TaskLocation.unknown();
}
@@ -264,6 +283,7 @@ public class KubernetesPeonLifecycle
PodStatus podStatus = pod.getStatus();
if (podStatus == null || podStatus.getPodIP() == null) {
+ log.warn("Could not get task location from k8s for task [%s].",
taskId);
return TaskLocation.unknown();
}
taskLocation = TaskLocation.create(
@@ -376,4 +396,17 @@ public class KubernetesPeonLifecycle
);
stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
}
+
+ /**
+ * Retrieves the current {@link ListenableFuture} representing whether the
task started successfully
+ *
+ * <p>This future can be used to track whether the task started
successfully, with a boolean result
+ * indicating success (true) or failure (false) when the task starts.
+ *
+ * @return a {@link ListenableFuture} representing whether the task started
successfully.
+ */
+ protected ListenableFuture<Boolean> getTaskStartedSuccessfullyFuture()
+ {
+ return taskStartedSuccessfullyFuture;
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index c324b49e13a..deb1f0b3d9c 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -55,12 +56,14 @@ import
org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -146,16 +149,28 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
- return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
- .getResult();
+ return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
+ task,
+ exec.submit(() -> runTask(task)),
+ peonLifecycleFactory.build(
+ task,
+ this::emitTaskStateMetrics
+ )
+ )).getResult();
}
}
- protected ListenableFuture<TaskStatus> joinAsync(Task task)
+ protected KubernetesWorkItem joinAsync(Task task)
{
synchronized (tasks) {
- return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
- .getResult();
+ return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(
+ task,
+ exec.submit(() -> joinTask(task)),
+ peonLifecycleFactory.build(
+ task,
+ this::emitTaskStateMetrics
+ )
+ ));
}
}
@@ -173,10 +188,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
protected TaskStatus doTask(Task task, boolean run)
{
try {
- KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
- task,
- this::emitTaskStateMetrics
- );
+ KubernetesPeonLifecycle peonLifecycle;
synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());
@@ -185,7 +197,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
throw new ISE("Task [%s] has been shut down", task.getId());
}
- workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ peonLifecycle = workItem.getPeonLifeycle();
}
TaskStatus taskStatus;
@@ -321,16 +333,53 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
public void start()
{
log.info("Starting K8sTaskRunner...");
- // Load tasks from previously running jobs and wait for their statuses to
be updated asynchronously.
- for (Job job : client.getPeonJobs()) {
+ // Load tasks from previously running jobs and wait for their statuses to
start running.
+ final List<ListenableFuture<Boolean>> taskStatusActiveList = new
ArrayList<>();
+ final List<Job> peonJobs = client.getPeonJobs();
+
+ log.info("Locating [%,d] active tasks.", peonJobs.size());
+ for (Job job : peonJobs) {
try {
- joinAsync(adapter.toTask(job));
+ KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job));
+
taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture());
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]",
job.getMetadata().getName());
}
}
- log.info("Loaded %,d tasks from previous run", tasks.size());
+
+ try {
+ final DateTime nowUtc = DateTimes.nowUtc();
+ final long timeoutMs =
nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis();
+ if (timeoutMs > 0) {
+ FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs,
TimeUnit.MILLISECONDS);
+ }
+ log.info("Located [%,d] active tasks.", taskStatusActiveList.size());
+ }
+ catch (Exception e) {
+ final long numInitialized =
+ tasks.values()
+ .stream()
+ .filter(item -> {
+ if
(item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) {
+ try {
+ return
item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get();
+ }
+ catch (InterruptedException | ExecutionException ex) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }).count();
+ log.warn(
+ e,
+ "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating
others asynchronously.",
+ numInitialized,
+ taskStatusActiveList.size(),
+ config.getTaskJoinTimeout()
+ );
+ }
cleanupExecutor.scheduleAtFixedRate(
() ->
@@ -342,7 +391,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
config.getTaskCleanupInterval().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);
- log.debug("Started cleanup executor for jobs older than %s...",
config.getTaskCleanupDelay());
+ log.info("Started cleanup executor for jobs older than %s...",
config.getTaskCleanupDelay());
}
@Override
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 60efa3c4856..106378f57aa 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig
// interval for k8s job cleanup to run
private Period taskCleanupInterval = new Period("PT10m");
+ @JsonProperty
+ @NotNull
+ // how long to wait to join peon k8s jobs on startup
+ private Period taskJoinTimeout = new Period("PT1M");
+
+
@JsonProperty
@NotNull
// how long to wait for the peon k8s job to launch
@@ -140,7 +146,8 @@ public class KubernetesTaskRunnerConfig
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
- Integer capacity
+ Integer capacity,
+ Period taskJoinTimeout
)
{
this.namespace = namespace;
@@ -181,6 +188,10 @@ public class KubernetesTaskRunnerConfig
k8sjobLaunchTimeout,
this.k8sjobLaunchTimeout
);
+ this.taskJoinTimeout = ObjectUtils.defaultIfNull(
+ taskJoinTimeout,
+ this.taskJoinTimeout
+ );
this.peonMonitors = ObjectUtils.defaultIfNull(
peonMonitors,
this.peonMonitors
@@ -247,6 +258,11 @@ public class KubernetesTaskRunnerConfig
{
return maxTaskDuration;
}
+ public Period getTaskJoinTimeout()
+ {
+ return taskJoinTimeout;
+ }
+
public Period getTaskCleanupDelay()
{
@@ -317,6 +333,7 @@ public class KubernetesTaskRunnerConfig
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
+ private Period taskJoinTimeout;
public Builder()
{
@@ -425,6 +442,12 @@ public class KubernetesTaskRunnerConfig
return this;
}
+ public Builder withTaskJoinTimeout(Period taskJoinTimeout)
+ {
+ this.taskJoinTimeout = taskJoinTimeout;
+ return this;
+ }
+
public KubernetesTaskRunnerConfig build()
{
return new KubernetesTaskRunnerConfig(
@@ -444,7 +467,8 @@ public class KubernetesTaskRunnerConfig
this.cpuCoreInMicro,
this.labels,
this.annotations,
- this.capacity
+ this.capacity,
+ this.taskJoinTimeout
);
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index 94d4bbb67f6..5eb55b097b4 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -20,7 +20,6 @@
package org.apache.druid.k8s.overlord;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
@@ -34,27 +33,19 @@ import java.io.InputStream;
public class KubernetesWorkItem extends TaskRunnerWorkItem
{
private final Task task;
- private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
+ private final KubernetesPeonLifecycle kubernetesPeonLifecycle;
- public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus>
statusFuture)
+ public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus>
statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle)
{
super(task.getId(), statusFuture);
this.task = task;
- }
-
- protected synchronized void
setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle)
- {
- Preconditions.checkState(this.kubernetesPeonLifecycle == null);
this.kubernetesPeonLifecycle = kubernetesPeonLifecycle;
}
protected synchronized void shutdown()
{
-
- if (this.kubernetesPeonLifecycle != null) {
- this.kubernetesPeonLifecycle.startWatchingLogs();
- this.kubernetesPeonLifecycle.shutdown();
- }
+ this.kubernetesPeonLifecycle.startWatchingLogs();
+ this.kubernetesPeonLifecycle.shutdown();
}
protected boolean isPending()
@@ -88,18 +79,12 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
protected Optional<InputStream> streamTaskLogs()
{
- if (kubernetesPeonLifecycle == null) {
- return Optional.absent();
- }
return kubernetesPeonLifecycle.streamLogs();
}
@Override
public TaskLocation getLocation()
{
- if (kubernetesPeonLifecycle == null) {
- return TaskLocation.unknown();
- }
return kubernetesPeonLifecycle.getTaskLocation();
}
@@ -119,4 +104,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
{
return task;
}
+
+ protected KubernetesPeonLifecycle getPeonLifeycle()
+ {
+ return this.kubernetesPeonLifecycle;
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 13ad4fda209..85ea7072b85 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -52,6 +52,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -264,6 +265,53 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED,
peonLifecycle.getState());
}
+ @Test
+ public void test_run_whenExceptionRaised_setsStartStatusFutureToFalse()
throws ExecutionException, InterruptedException
+ {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ )
+ {
+ @Override
+ protected synchronized TaskStatus join(long timeout)
+ {
+ throw new IllegalStateException();
+ }
+ };
+
+ Job job = new
JobBuilder().withNewMetadata().withName(ID).endMetadata().build();
+
+ EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
+ EasyMock.eq(job),
+ EasyMock.eq(task),
+ EasyMock.anyLong(),
+ EasyMock.eq(TimeUnit.MILLISECONDS)
+ )).andReturn(null);
+ Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
+
+ replayAll();
+
+ Assert.assertThrows(
+ Exception.class,
+ () -> peonLifecycle.run(job, 0L, 0L, false)
+ );
+
+ verifyAll();
+
+ Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED,
peonLifecycle.getState());
+
Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone());
+ Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().get());
+
+ }
+
@Test
public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
{
@@ -313,6 +361,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
stateListener
);
+
Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone());
Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
@@ -347,7 +396,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
TaskStatus taskStatus = peonLifecycle.join(0L);
verifyAll();
-
+
Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone());
Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus);
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED,
peonLifecycle.getState());
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 67a5278c6a3..a6c01ee306a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
@@ -103,6 +104,8 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_start_withExistingJobs() throws IOException
{
+ SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ settableFuture.set(true);
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
@@ -113,15 +116,16 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
)
{
@Override
- protected ListenableFuture<TaskStatus> joinAsync(Task task)
+ protected KubernetesWorkItem joinAsync(Task task)
{
return tasks.computeIfAbsent(
task.getId(),
k -> new KubernetesWorkItem(
task,
- Futures.immediateFuture(TaskStatus.success(task.getId()))
+ Futures.immediateFuture(TaskStatus.success(task.getId())),
+ kubernetesPeonLifecycle
)
- ).getResult();
+ );
}
};
@@ -133,6 +137,67 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+
EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn(
+ settableFuture
+ );
+
+ replayAll();
+
+ runner.start();
+
+ verifyAll();
+
+ Assert.assertNotNull(runner.tasks);
+ Assert.assertEquals(1, runner.tasks.size());
+ }
+
+ @Test
+ public void test_start_withExistingJobs_oneJobFails() throws IOException
+ {
+ SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ settableFuture.set(true);
+ KubernetesTaskRunner runner = new KubernetesTaskRunner(
+ taskAdapter,
+ config,
+ peonClient,
+ httpClient,
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
+ )
+ {
+ @Override
+ protected KubernetesWorkItem joinAsync(Task task)
+ {
+ return tasks.computeIfAbsent(
+ task.getId(),
+ k -> new KubernetesWorkItem(
+ task,
+ Futures.immediateFuture(TaskStatus.success(task.getId())),
+ kubernetesPeonLifecycle
+ )
+ );
+ }
+ };
+
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(ID)
+ .endMetadata()
+ .build();
+
+ Job job2 = new JobBuilder()
+ .withNewMetadata()
+ .withName("id2")
+ .endMetadata()
+ .build();
+
+ EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job,
job2));
+ EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+ EasyMock.expect(taskAdapter.toTask(job2)).andThrow(new
IOException("deserialization exception"));
+
+
EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn(
+ settableFuture
+ );
replayAll();
@@ -157,10 +222,9 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
)
{
@Override
- protected ListenableFuture<TaskStatus> joinAsync(Task task)
+ protected KubernetesWorkItem joinAsync(Task task)
{
- return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, null))
- .getResult();
+ return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, null, kubernetesPeonLifecycle));
}
};
@@ -193,7 +257,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_streamTaskLog_withExistingTask() throws IOException
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null)
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle)
{
@Override
protected Optional<InputStream> streamTaskLogs()
@@ -241,7 +305,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_run_withExistingTask_returnsExistingWorkItem()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle);
runner.tasks.put(task.getId(), workItem);
ListenableFuture<TaskStatus> future = runner.run(task);
@@ -286,8 +350,8 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
replayAll();
- ListenableFuture<TaskStatus> future = runner.joinAsync(task);
- Assert.assertEquals(taskStatus, future.get());
+ KubernetesWorkItem workItem = runner.joinAsync(task);
+ Assert.assertEquals(taskStatus, workItem.getResult().get());
verifyAll();
}
@@ -295,7 +359,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_join_withExistingTask_returnsExistingWorkItem()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle);
runner.tasks.put(task.getId(), workItem);
ListenableFuture<TaskStatus> future = runner.run(task);
@@ -310,9 +374,9 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
replayAll();
- ListenableFuture<TaskStatus> future = runner.joinAsync(task);
+ KubernetesWorkItem workItem = runner.joinAsync(task);
- Exception e = Assert.assertThrows(ExecutionException.class, future::get);
+ Exception e = Assert.assertThrows(ExecutionException.class, () ->
workItem.getResult().get());
Assert.assertTrue(e.getCause() instanceof RuntimeException);
verifyAll();
@@ -331,7 +395,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_shutdown_withExistingTask_removesTaskFromMap()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture,
kubernetesPeonLifecycle) {
@Override
protected synchronized void shutdown()
{
@@ -348,7 +412,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void
test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture,
kubernetesPeonLifecycle) {
@Override
protected synchronized void shutdown()
{
@@ -385,7 +449,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_getKnownTasks()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle);
runner.tasks.put(task.getId(), workItem);
@@ -399,7 +463,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
public void test_getRunningTasks()
{
Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
- KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask,
null) {
+ KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask,
null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -409,7 +473,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
runner.tasks.put(pendingTask.getId(), pendingWorkItem);
Task runningTask = K8sTestUtils.createTask("running-id", 0);
- KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask,
null) {
+ KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask,
null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -428,7 +492,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
public void test_getPendingTasks()
{
Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
- KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask,
null) {
+ KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask,
null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -438,7 +502,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
runner.tasks.put(pendingTask.getId(), pendingWorkItem);
Task runningTask = K8sTestUtils.createTask("running-id", 0);
- KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask,
null) {
+ KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask,
null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -462,7 +526,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_getRunnerTaskState_withExistingTask()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -477,7 +541,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_streamTaskReports_withExistingTask() throws Exception
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -512,7 +576,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void
test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws
Exception
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -529,7 +593,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void
test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -593,7 +657,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void
test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -618,7 +682,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_metricsReported_whenTaskStateChange()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -640,7 +704,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_getTaskLocation_withExistingTask()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -657,7 +721,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_getTaskLocation_throws()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null)
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle)
{
@Override
public TaskLocation getLocation()
@@ -689,7 +753,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
public void test_getUsedCapacity()
{
Assert.assertEquals(0, runner.getUsedCapacity());
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle);
runner.tasks.put(task.getId(), workItem);
Assert.assertEquals(1, runner.getUsedCapacity());
runner.tasks.remove(task.getId());
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index 7d17193b171..fe2b576bccb 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -45,36 +45,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
public void setup()
{
task = NoopTask.create();
- workItem = new KubernetesWorkItem(task, null);
- }
-
- @Test
- public void
test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException()
- {
- workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
- task,
- null,
- null,
- null,
- null
- ));
-
- Assert.assertThrows(
- IllegalStateException.class,
- () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
- task,
- null,
- null,
- null,
- null
- ))
- );
- }
-
- @Test
- public void test_shutdown_withoutKubernetesPeonLifecycle()
- {
- workItem.shutdown();
}
@Test
@@ -86,7 +56,11 @@ public class KubernetesWorkItemTest extends EasyMockSupport
EasyMock.expectLastCall();
replayAll();
- workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
+ workItem = new KubernetesWorkItem(
+ task,
+ null,
+ kubernetesPeonLifecycle
+ );
workItem.shutdown();
verifyAll();
@@ -95,7 +69,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Test
public void test_isPending_withTaskStateWaiting_returnsFalse()
{
- workItem = new KubernetesWorkItem(task, null) {
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -108,7 +82,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Test
public void test_isPending_withTaskStatePending_returnsTrue()
{
- workItem = new KubernetesWorkItem(task, null) {
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -121,7 +95,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Test
public void test_isRunning_withTaskStateWaiting_returnsFalse()
{
- workItem = new KubernetesWorkItem(task, null) {
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -134,7 +108,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Test
public void test_isRunning_withTaskStatePending_returnsTrue()
{
- workItem = new KubernetesWorkItem(task, null) {
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) {
@Override
protected RunnerTaskState getRunnerTaskState()
{
@@ -144,22 +118,17 @@ public class KubernetesWorkItemTest extends
EasyMockSupport
Assert.assertTrue(workItem.isRunning());
}
- @Test
- public void
test_getRunnerTaskState_withoutKubernetesPeonLifecycle_returnsPending()
- {
- Assert.assertEquals(RunnerTaskState.PENDING,
workItem.getRunnerTaskState());
- }
-
@Test
public void
test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending()
{
- workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
null,
null,
null,
null
- ));
+ );
+ workItem = new KubernetesWorkItem(task, null, peonLifecycle);
Assert.assertEquals(RunnerTaskState.PENDING,
workItem.getRunnerTaskState());
}
@@ -181,7 +150,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
}
};
- workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ workItem = new KubernetesWorkItem(task, null, peonLifecycle);
Assert.assertEquals(RunnerTaskState.PENDING,
workItem.getRunnerTaskState());
}
@@ -203,7 +172,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
}
};
- workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ workItem = new KubernetesWorkItem(task, null, peonLifecycle);
Assert.assertEquals(RunnerTaskState.RUNNING,
workItem.getRunnerTaskState());
}
@@ -225,46 +194,36 @@ public class KubernetesWorkItemTest extends
EasyMockSupport
}
};
- workItem.setKubernetesPeonLifecycle(peonLifecycle);
+ workItem = new KubernetesWorkItem(task, null, peonLifecycle);
Assert.assertEquals(RunnerTaskState.NONE, workItem.getRunnerTaskState());
}
- @Test
- public void test_streamTaskLogs_withoutKubernetesPeonLifecycle()
- {
- Assert.assertFalse(workItem.streamTaskLogs().isPresent());
- }
-
@Test
public void test_streamTaskLogs_withKubernetesPeonLifecycle()
{
- workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
null,
null,
null,
null
- ));
+ );
+ workItem = new KubernetesWorkItem(task, null, peonLifecycle);
Assert.assertFalse(workItem.streamTaskLogs().isPresent());
}
- @Test
- public void test_getLocation_withoutKubernetesPeonLifecycle()
- {
- Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation());
- }
-
@Test
public void test_getLocation_withKubernetesPeonLifecycle()
{
- workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
null,
null,
null,
null
- ));
+ );
+ workItem = new KubernetesWorkItem(task, null, peonLifecycle);
Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation());
}
@@ -272,18 +231,28 @@ public class KubernetesWorkItemTest extends
EasyMockSupport
@Test
public void test_getTaskType()
{
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
Assert.assertEquals(task.getType(), workItem.getTaskType());
}
@Test
public void test_getDataSource()
{
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
Assert.assertEquals(task.getDataSource(), workItem.getDataSource());
}
@Test
public void test_getTask()
{
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
Assert.assertEquals(task, workItem.getTask());
}
+
+ @Test
+ public void test_peonLifeycle()
+ {
+ workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle);
+ Assert.assertEquals(kubernetesPeonLifecycle, workItem.getPeonLifeycle());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]