This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 130bfbfc6d7 Revert "Separate task lifecycle from kubernetes/location
lifecycle (#15133)" (#15346)
130bfbfc6d7 is described below
commit 130bfbfc6d7d30b6ee2efab90aea415245d9313e
Author: George Shiqi Wu <[email protected]>
AuthorDate: Wed Nov 8 13:12:30 2023 -0500
Revert "Separate task lifecycle from kubernetes/location lifecycle
(#15133)" (#15346)
This reverts commit dc0b163e192545c802b7fe2b3271e035cc1e70ff.
---
.../overlord/KubernetesAndWorkerTaskRunner.java | 6 +
.../k8s/overlord/KubernetesPeonLifecycle.java | 25 +---
.../overlord/KubernetesPeonLifecycleFactory.java | 10 +-
.../druid/k8s/overlord/KubernetesTaskRunner.java | 47 +++----
.../k8s/overlord/KubernetesTaskRunnerConfig.java | 2 +-
.../druid/k8s/overlord/KubernetesWorkItem.java | 23 +---
.../druid/k8s/overlord/PeonLifecycleFactory.java | 7 +-
.../druid/k8s/overlord/common/JobResponse.java | 9 +-
.../k8s/overlord/common/KubernetesPeonClient.java | 6 +-
.../druid/k8s/overlord/common/PeonPhase.java | 62 +++++++++
.../KubernetesAndWorkerTaskRunnerTest.java | 9 ++
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 140 ++++++---------------
.../overlord/KubernetesTaskRunnerConfigTest.java | 4 +-
.../k8s/overlord/KubernetesTaskRunnerTest.java | 102 ++-------------
.../druid/k8s/overlord/KubernetesWorkItemTest.java | 11 +-
.../k8s/overlord/TestPeonLifecycleFactory.java | 7 +-
.../druid/k8s/overlord/common/JobResponseTest.java | 8 +-
.../overlord/common/KubernetesPeonClientTest.java | 3 +
.../druid/k8s/overlord/common/PeonPhaseTest.java} | 27 ++--
.../DruidPeonClientIntegrationTest.java | 5 +-
.../test/resources/expectedEphemeralOutput.yaml | 2 +-
.../resources/expectedMultiContainerOutput.yaml | 2 +-
.../expectedMultiContainerOutputOrder.yaml | 2 +-
.../src/test/resources/expectedNoopJob.yaml | 2 +-
.../src/test/resources/expectedNoopJobLongIds.yaml | 2 +-
.../test/resources/expectedNoopJobNoTaskJson.yaml | 2 +-
.../test/resources/expectedNoopJobTlsEnabled.yaml | 2 +-
.../src/test/resources/expectedPodSpec.yaml | 2 +-
.../resources/expectedSingleContainerOutput.yaml | 2 +-
.../common/actions/UpdateLocationAction.java | 12 +-
.../druid/indexing/common/task/AbstractTask.java | 10 ++
.../apache/druid/indexing/overlord/TaskRunner.java | 5 +
.../common/actions/UpdateLocationActionTest.java | 71 +++++++++++
.../indexing/common/task/AbstractTaskTest.java | 2 +-
34 files changed, 299 insertions(+), 332 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index d8f4e9d84f2..2c45a0ec7b8 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -285,4 +285,10 @@ public class KubernetesAndWorkerTaskRunner implements
TaskLogStreamer, WorkerTas
{
kubernetesTaskRunner.updateStatus(task, status);
}
+
+ @Override
+ public void updateLocation(Task task, TaskLocation location)
+ {
+ kubernetesTaskRunner.updateLocation(task, location);
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index ea3c2a19d1c..5c6c7c6b3eb 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -31,9 +31,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.indexing.overlord.TaskRunnerUtils;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
@@ -50,8 +47,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -94,8 +89,6 @@ public class KubernetesPeonLifecycle
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
- private final List<Pair<TaskRunnerListener, Executor>> listeners;
-
@MonotonicNonNull
private LogWatch logWatch;
@@ -106,8 +99,7 @@ public class KubernetesPeonLifecycle
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
- TaskStateListener stateListener,
- List<Pair<TaskRunnerListener, Executor>> listeners
+ TaskStateListener stateListener
)
{
this.taskId = new K8sTaskId(task);
@@ -116,7 +108,6 @@ public class KubernetesPeonLifecycle
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
- this.listeners = listeners;
}
/**
@@ -187,11 +178,7 @@ public class KubernetesPeonLifecycle
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING},
State.RUNNING);
- TaskRunnerUtils.notifyLocationChanged(
- listeners,
- task.getId(),
- getTaskLocation()
- );
+
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
@@ -203,14 +190,12 @@ public class KubernetesPeonLifecycle
finally {
try {
saveLogs();
- shutdown();
}
catch (Exception e) {
- log.warn(e, "Cleanup failed for task [%s]", taskId);
- }
- finally {
- stopTask();
+ log.warn(e, "Log processing failed for task [%s]", taskId);
}
+
+ stopTask();
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
index 2998f3fc921..bf4e3a71257 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
@@ -21,14 +21,9 @@ package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;
-import java.util.List;
-import java.util.concurrent.Executor;
-
public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
{
private final KubernetesPeonClient client;
@@ -47,15 +42,14 @@ public class KubernetesPeonLifecycleFactory implements
PeonLifecycleFactory
}
@Override
- public KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners)
+ public KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 56de37c3798..a0a29dcbbb9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -146,20 +146,16 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
- return tasks.computeIfAbsent(task.getId(), k -> {
- ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));
- return new KubernetesWorkItem(task);
- }).getResult();
+ return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
+ .getResult();
}
}
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
- return tasks.computeIfAbsent(task.getId(), k -> {
- ListenableFuture<TaskStatus> unused = exec.submit(() ->
joinTask(task));
- return new KubernetesWorkItem(task);
- }).getResult();
+ return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
+ .getResult();
}
}
@@ -176,12 +172,10 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
- TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution
never started");
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
- this::emitTaskStateMetrics,
- listeners
+ this::emitTaskStateMetrics
);
synchronized (tasks) {
@@ -194,6 +188,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}
+ TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
@@ -206,17 +201,15 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
config.getTaskTimeout().toStandardDuration().getMillis()
);
}
+
+ updateStatus(task, taskStatus);
+
return taskStatus;
}
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
- taskStatus = TaskStatus.failure(task.getId(), "Could not start task
execution");
throw new RuntimeException(e);
}
- finally {
- updateStatus(task, taskStatus);
- TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(),
TaskLocation.unknown());
- }
}
@VisibleForTesting
@@ -249,15 +242,15 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public void updateStatus(Task task, TaskStatus status)
{
- KubernetesWorkItem workItem = tasks.get(task.getId());
- if (workItem != null && !workItem.getResult().isDone() &&
status.isComplete()) {
- workItem.setResult(status);
- }
-
- // Notify listeners even if the result is set to handle the shutdown case.
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
+ @Override
+ public void updateLocation(Task task, TaskLocation location)
+ {
+ TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
+ }
+
@Override
public void shutdown(String taskid, String reason)
{
@@ -424,16 +417,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener,
executor);
log.debug("Registered listener [%s]", listener.getListenerId());
listeners.add(listenerPair);
-
- for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
- if (entry.getValue().isRunning()) {
- TaskRunnerUtils.notifyLocationChanged(
- ImmutableList.of(listenerPair),
- entry.getKey(),
- entry.getValue().getLocation()
- );
- }
- }
}
@Override
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 5fdcd9cfbb7..0d67c55b30a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
@JsonProperty
@NotNull
// how long to wait for the jobs to be cleaned up.
- private Period taskCleanupDelay = new Period("PT1H");
+ private Period taskCleanupDelay = new Period("P2D");
@JsonProperty
@NotNull
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index b089b4dd2db..94d4bbb67f6 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -19,10 +19,9 @@
package org.apache.druid.k8s.overlord;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -37,18 +36,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
- private final SettableFuture<TaskStatus> result;
-
- public KubernetesWorkItem(Task task)
- {
- this(task, SettableFuture.create());
- }
-
- @VisibleForTesting
- public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
+ public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus>
statusFuture)
{
- super(task.getId(), result);
- this.result = result;
+ super(task.getId(), statusFuture);
this.task = task;
}
@@ -61,7 +51,7 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
protected synchronized void shutdown()
{
- if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
+ if (this.kubernetesPeonLifecycle != null) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
@@ -129,9 +119,4 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
{
return task;
}
-
- public void setResult(TaskStatus status)
- {
- result.set(status);
- }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
index 2b180fb9dac..2a234ebc578 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
@@ -20,13 +20,8 @@
package org.apache.druid.k8s.overlord;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
-
-import java.util.List;
-import java.util.concurrent.Executor;
public interface PeonLifecycleFactory
{
- KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners);
+ KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener);
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
index 26fe5b98f0b..a7a8156468f 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
@@ -32,10 +32,12 @@ public class JobResponse
private static final EmittingLogger LOGGER = new
EmittingLogger(JobResponse.class);
private final Job job;
+ private final PeonPhase phase;
- public JobResponse(@Nullable Job job)
+ public JobResponse(@Nullable Job job, PeonPhase phase)
{
this.job = job;
+ this.phase = phase;
}
public Job getJob()
@@ -43,6 +45,11 @@ public class JobResponse
return job;
}
+ public PeonPhase getPhase()
+ {
+ return phase;
+ }
+
public long getJobDuration()
{
long duration = -1L;
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 147ea732d0c..9fdc25fa645 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -101,13 +101,13 @@ public class KubernetesPeonClient
);
if (job == null) {
log.info("K8s job for the task [%s] was not found. It can happen if
the task was canceled", taskId);
- return new JobResponse(null);
+ return new JobResponse(null, PeonPhase.FAILED);
}
if (job.getStatus().getSucceeded() != null) {
- return new JobResponse(job);
+ return new JobResponse(job, PeonPhase.SUCCEEDED);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
- return new JobResponse(job);
+ return new JobResponse(job, PeonPhase.FAILED);
});
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java
new file mode 100644
index 00000000000..6efcd34872b
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.api.model.Pod;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum PeonPhase
+{
+ PENDING("Pending"),
+ SUCCEEDED("Succeeded"),
+ FAILED("Failed"),
+ UNKNOWN("Unknown"),
+ RUNNING("Running");
+
+ private static final Map<String, PeonPhase> PHASE_MAP =
Arrays.stream(PeonPhase.values())
+
.collect(Collectors.toMap(
+
PeonPhase::getPhase,
+
Function.identity()
+ ));
+ private final String phase;
+
+ PeonPhase(String phase)
+ {
+ this.phase = phase;
+ }
+
+ public String getPhase()
+ {
+ return phase;
+ }
+
+ public static PeonPhase getPhaseFor(Pod pod)
+ {
+ if (pod == null) {
+ return UNKNOWN;
+ }
+ return PHASE_MAP.get(pod.getStatus().getPhase());
+ }
+
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index 40ca6fc2f2b..3ab515cc6e5 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -362,4 +362,13 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
runner.updateStatus(task, TaskStatus.running(ID));
verifyAll();
}
+
+ @Test
+ public void test_updateLocation()
+ {
+ kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
+ replayAll();
+ runner.updateLocation(task, TaskLocation.unknown());
+ verifyAll();
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index e984e449b28..1c6e429a3dc 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -32,12 +31,11 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -52,8 +50,6 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,8 +65,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
@Mock LogWatch logWatch;
@Mock KubernetesPeonLifecycle.TaskStateListener stateListener;
- List<Pair<TaskRunnerListener, Executor>> listeners = ImmutableList.of();
-
private ObjectMapper mapper;
private Task task;
private K8sTaskId k8sTaskId;
@@ -92,8 +86,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
)
{
@Override
@@ -138,8 +131,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
)
{
@Override
@@ -183,8 +175,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
)
{
@Override
@@ -233,8 +224,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
)
{
@Override
@@ -278,19 +268,15 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
- )).andReturn(new JobResponse(null));
+ )).andReturn(new JobResponse(null, PeonPhase.FAILED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
@@ -316,15 +302,12 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
@Test
public void test_join() throws IOException
{
- Executor executor = EasyMock.mock(Executor.class);
- TaskRunnerListener taskRunnerListener =
EasyMock.mock(TaskRunnerListener.class);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- ImmutableList.of(Pair.of(taskRunnerListener, executor))
+ stateListener
);
Job job = new JobBuilder()
@@ -342,12 +325,8 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
- )).andReturn(new JobResponse(job));
+ )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
- Optional.of(new
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
- );
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS),
StandardCharsets.UTF_8)
));
@@ -359,10 +338,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- executor.execute(EasyMock.anyObject());
- EasyMock.expectLastCall();
- taskRunnerListener.locationChanged(EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
@@ -384,8 +359,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
Job job = new JobBuilder()
@@ -401,15 +375,8 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
- )).andReturn(new JobResponse(job));
+ )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
- // Only update the location the first time, second call doesn't reach this
point in the logic
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
- Optional.of(new
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
- );
- // Always try to delete the job
-
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true).times(2);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS),
StandardCharsets.UTF_8))
);
@@ -452,8 +419,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
Job job = new JobBuilder()
@@ -469,12 +435,8 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
- )).andReturn(new JobResponse(job));
+ )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
- Optional.of(new
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
- );
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
@@ -507,8 +469,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
Job job = new JobBuilder()
@@ -524,12 +485,8 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
- )).andReturn(new JobResponse(job));
+ )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
- Optional.of(new
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
- );
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
@@ -562,8 +519,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
Job job = new JobBuilder()
@@ -579,11 +535,8 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
- )).andReturn(new JobResponse(job));
+ )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
- Optional.of(new
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
- );
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS),
StandardCharsets.UTF_8))
);
@@ -596,9 +549,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
logWatch.close();
EasyMock.expectLastCall();
- // We should still try to cleanup the Job after
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
replayAll();
@@ -619,8 +569,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
@@ -629,9 +578,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andThrow(new RuntimeException());
-
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(
- Optional.of(new
PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build())
- );
// We should still try to push logs
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
@@ -642,7 +588,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
+
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
replayAll();
@@ -662,8 +608,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
peonLifecycle.shutdown();
}
@@ -676,8 +621,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.PENDING);
@@ -698,8 +642,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -720,8 +663,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.STOPPED);
@@ -736,8 +678,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.NOT_STARTED);
@@ -752,8 +693,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.PENDING);
@@ -768,8 +708,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -792,8 +731,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.STOPPED);
@@ -809,8 +747,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.NOT_STARTED);
@@ -826,8 +763,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.PENDING);
@@ -843,8 +779,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -866,8 +801,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -895,8 +829,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -932,8 +865,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -969,8 +901,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
@@ -1007,8 +938,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
kubernetesClient,
taskLogs,
mapper,
- stateListener,
- listeners
+ stateListener
);
setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.STOPPED);
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
index 579b7539d81..1f4a7281f64 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
@@ -47,7 +47,7 @@ public class KubernetesTaskRunnerConfigTest
Assert.assertNull(config.getGraceTerminationPeriodSeconds());
Assert.assertTrue(config.isDisableClientProxy());
Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout());
- Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay());
+ Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay());
Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval());
Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout());
Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors());
@@ -72,7 +72,7 @@ public class KubernetesTaskRunnerConfigTest
Assert.assertNull(config.getGraceTerminationPeriodSeconds());
Assert.assertTrue(config.isDisableClientProxy());
Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout());
- Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay());
+ Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay());
Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval());
Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout());
Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors());
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index e04ef630036..36a7b4cfcd9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -28,10 +28,8 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
@@ -78,9 +76,6 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
@Mock private ServiceEmitter emitter;
- @Mock private Executor executor;
- @Mock private TaskRunnerListener taskRunnerListener;
-
private KubernetesTaskRunnerConfig config;
private KubernetesTaskRunner runner;
private Task task;
@@ -121,7 +116,11 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
{
return tasks.computeIfAbsent(
task.getId(),
- k -> new KubernetesWorkItem(task)).getResult();
+ k -> new KubernetesWorkItem(
+ task,
+ Futures.immediateFuture(TaskStatus.success(task.getId()))
+ )
+ ).getResult();
}
};
@@ -250,7 +249,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
}
@Test
- public void test_run_whenExceptionThrown_throwsRuntimeException() throws
Exception
+ public void test_run_whenExceptionThrown_throwsRuntimeException() throws
IOException
{
Job job = new JobBuilder()
.withNewMetadata()
@@ -270,89 +269,11 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
replayAll();
ListenableFuture<TaskStatus> future = runner.run(task);
- TaskStatus taskStatus = future.get();
- Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
- Assert.assertEquals("Could not start task execution",
taskStatus.getErrorMsg());
- verifyAll();
- }
- @Test
- public void test_run_updateStatus() throws ExecutionException,
InterruptedException
- {
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- );
+ Exception e = Assert.assertThrows(ExecutionException.class, future::get);
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
- KubernetesWorkItem workItem = new KubernetesWorkItem(task);
- runner.tasks.put(task.getId(), workItem);
- TaskStatus completeTaskStatus = TaskStatus.success(task.getId());
-
- replayAll();
- runner.updateStatus(task, completeTaskStatus);
verifyAll();
-
- assertTrue(workItem.getResult().isDone());
- assertEquals(completeTaskStatus, workItem.getResult().get());
- }
-
- @Test
- public void test_run_updateStatus_running()
- {
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- );
- KubernetesWorkItem workItem = new KubernetesWorkItem(task);
- runner.tasks.put(task.getId(), workItem);
- TaskStatus runningTaskStatus = TaskStatus.running(task.getId());
-
- replayAll();
- runner.updateStatus(task, runningTaskStatus);
- verifyAll();
-
- assertFalse(workItem.getResult().isDone());
- }
-
- @Test
- public void test_registerListener_runningTask()
- {
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- );
-
- KubernetesPeonLifecycle runningKubernetesPeonLifecycle =
EasyMock.mock(KubernetesPeonLifecycle.class);
-
EasyMock.expect(runningKubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING);
-
EasyMock.expect(runningKubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown());
- KubernetesWorkItem workItem = new KubernetesWorkItem(task);
- workItem.setKubernetesPeonLifecycle(runningKubernetesPeonLifecycle);
- runner.tasks.put(task.getId(), workItem);
-
- Executor executor = EasyMock.mock(Executor.class);
- TaskRunnerListener taskRunnerListener =
EasyMock.mock(TaskRunnerListener.class);
- executor.execute(EasyMock.anyObject());
- EasyMock.expectLastCall();
- taskRunnerListener.locationChanged(EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall();
-
- replayAll();
- EasyMock.replay(runningKubernetesPeonLifecycle);
- runner.registerListener(taskRunnerListener, executor);
- verifyAll();
- EasyMock.verify(runningKubernetesPeonLifecycle);
}
@Test
@@ -382,15 +303,16 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
}
@Test
- public void test_join_whenExceptionThrown_throwsRuntimeException() throws
ExecutionException, InterruptedException
+ public void test_join_whenExceptionThrown_throwsRuntimeException()
{
EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new
IllegalStateException());
replayAll();
ListenableFuture<TaskStatus> future = runner.joinAsync(task);
- TaskStatus taskStatus = future.get();
- Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
+
+ Exception e = Assert.assertThrows(ExecutionException.class, future::get);
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
verifyAll();
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index f2f398658e0..7d17193b171 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -56,7 +56,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
));
@@ -67,7 +66,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
))
);
@@ -82,8 +80,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Test
public void test_shutdown_withKubernetesPeonLifecycle()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task);
-
kubernetesPeonLifecycle.shutdown();
EasyMock.expectLastCall();
kubernetesPeonLifecycle.startWatchingLogs();
@@ -91,6 +87,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
replayAll();
workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
+
workItem.shutdown();
verifyAll();
}
@@ -161,7 +158,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
));
@@ -176,7 +172,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
) {
@Override
@@ -199,7 +194,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
) {
@Override
@@ -222,7 +216,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
) {
@Override
@@ -251,7 +244,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
));
Assert.assertFalse(workItem.streamTaskLogs().isPresent());
@@ -271,7 +263,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport
null,
null,
null,
- null,
null
));
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
index fa0f79bc1d2..8b8c43c0d71 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
@@ -20,11 +20,6 @@
package org.apache.druid.k8s.overlord;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
-
-import java.util.List;
-import java.util.concurrent.Executor;
public class TestPeonLifecycleFactory implements PeonLifecycleFactory
{
@@ -36,7 +31,7 @@ public class TestPeonLifecycleFactory implements
PeonLifecycleFactory
}
@Override
- public KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners)
+ public KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return kubernetesPeonLifecycle;
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
index cf9f345fd7a..2e2043578aa 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java
@@ -39,7 +39,7 @@ class JobResponseTest
.endStatus()
.build();
- JobResponse response = new JobResponse(job);
+ JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED);
Assertions.assertEquals(58000L, response.getJobDuration());
}
@@ -56,7 +56,7 @@ class JobResponseTest
.endStatus()
.build();
- JobResponse response = new JobResponse(job);
+ JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED);
Assertions.assertEquals(-1, response.getJobDuration());
}
@@ -70,7 +70,7 @@ class JobResponseTest
.endMetadata()
.build();
- JobResponse response = new JobResponse(job);
+ JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED);
Assertions.assertEquals(-1, response.getJobDuration());
}
@@ -78,7 +78,7 @@ class JobResponseTest
@Test
void testNullJob()
{
- JobResponse response = new JobResponse(null);
+ JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED);
long duration = response.getJobDuration();
Assertions.assertEquals(-1, duration);
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index cde7faa473b..f6096b675d6 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -153,6 +153,7 @@ public class KubernetesPeonClientTest
TimeUnit.SECONDS
);
+ Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase());
Assertions.assertNotNull(jobResponse.getJob());
}
@@ -177,6 +178,7 @@ public class KubernetesPeonClientTest
TimeUnit.SECONDS
);
+ Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
Assertions.assertNotNull(jobResponse.getJob());
}
@@ -189,6 +191,7 @@ public class KubernetesPeonClientTest
TimeUnit.SECONDS
);
+ Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
Assertions.assertNull(jobResponse.getJob());
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java
similarity index 53%
copy from
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
copy to
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java
index 2b180fb9dac..3f6bd71312b 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java
@@ -17,16 +17,27 @@
* under the License.
*/
-package org.apache.druid.k8s.overlord;
+package org.apache.druid.k8s.overlord.common;
-import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.TaskRunnerListener;
-import org.apache.druid.java.util.common.Pair;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import org.junit.jupiter.api.Test;
-import java.util.List;
-import java.util.concurrent.Executor;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-public interface PeonLifecycleFactory
+public class PeonPhaseTest
{
- KubernetesPeonLifecycle build(Task task,
KubernetesPeonLifecycle.TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners);
+
+ @Test
+ void testGetPhaseForToMakeCoverageHappy()
+ {
+ Pod pod = mock(Pod.class);
+ PodStatus status = mock(PodStatus.class);
+ when(status.getPhase()).thenReturn("Succeeded");
+ when(pod.getStatus()).thenReturn(status);
+ assertEquals(PeonPhase.UNKNOWN, PeonPhase.getPhaseFor(null));
+ assertEquals(PeonPhase.SUCCEEDED, PeonPhase.getPhaseFor(pod));
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 2cc5bf15c65..09816168588 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -38,11 +38,13 @@ import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesClientApi;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
+import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.BeforeEach;
@@ -182,8 +184,9 @@ public class DruidPeonClientIntegrationTest
assertEquals(task, taskFromPod);
- peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES);
+ JobResponse jobStatusResult = peonClient.waitForPeonJobCompletion(taskId,
2, TimeUnit.MINUTES);
thread.join();
+ assertEquals(PeonPhase.SUCCEEDED, jobStatusResult.getPhase());
// as long as there were no exceptions we are good!
assertEquals(expectedLogs, actualLogs);
// cleanup my job
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
index 741a032eb6c..30960cdbc66 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml
@@ -62,4 +62,4 @@ spec:
ephemeral-storage: 1Gi
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
restartPolicy: "Never"
- ttlSecondsAfterFinished: 3600
\ No newline at end of file
+ ttlSecondsAfterFinished: 172800
\ No newline at end of file
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
index 73f31ddfb50..70b8b7c1d24 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml
@@ -105,4 +105,4 @@ spec:
name: "graveyard"
- emptyDir: {}
name: "kubexit"
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
index 73f31ddfb50..70b8b7c1d24 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml
@@ -105,4 +105,4 @@ spec:
name: "graveyard"
- emptyDir: {}
name: "kubexit"
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
index 004fed9585a..2cef837f397 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -16,7 +16,7 @@ metadata:
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index b6ca8a2cefe..cf16c49c5db 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -16,7 +16,7 @@ metadata:
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 8ecdaf50b01..d72d0ef37b0 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -16,7 +16,7 @@ metadata:
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
index 547887e9084..a230ac913a6 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -16,7 +16,7 @@ metadata:
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
index ecd9416c563..e46de133788 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml
@@ -104,4 +104,4 @@ spec:
name: "graveyard"
- emptyDir: {}
name: "kubexit"
- ttlSecondsAfterFinished: 3600
+ ttlSecondsAfterFinished: 172800
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
index 7afc393c56a..f270368fb55 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml
@@ -57,4 +57,4 @@ spec:
cpu: "1000m"
memory: "2400000000"
restartPolicy: "Never"
- ttlSecondsAfterFinished: 3600
\ No newline at end of file
+ ttlSecondsAfterFinished: 172800
\ No newline at end of file
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
index 088169d3a53..f4926864dcb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java
@@ -23,15 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
-/* This class was added for mm-less ingestion in order to let the peon manage
its own location lifecycle by submitting
-actions to the overlord. https://github.com/apache/druid/pull/15133 moved this
location logic to the overlord itself
-so this Action is no longer needed. For backwards compatibility with old
peons, this class was left in but can be deprecated
-for a later druid release.
-*/
-@Deprecated
public class UpdateLocationAction implements TaskAction<Void>
{
@JsonIgnore
@@ -62,6 +58,10 @@ public class UpdateLocationAction implements TaskAction<Void>
@Override
public Void perform(Task task, TaskActionToolbox toolbox)
{
+ Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
+ if (taskRunner.isPresent()) {
+ taskRunner.get().updateLocation(task, taskLocation);
+ }
return null;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index cd17160f3aa..e8770c512dc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -25,11 +25,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.UpdateLocationAction;
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
@@ -40,12 +42,14 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.segment.indexing.BatchIOConfig;
+import org.apache.druid.server.DruidNode;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
@@ -151,6 +155,11 @@ public abstract class AbstractTask implements Task
FileUtils.mkdirp(attemptDir);
reportsFile = new File(attemptDir, "report.json");
statusFile = new File(attemptDir, "status.json");
+ InetAddress hostName = InetAddress.getLocalHost();
+ DruidNode node = toolbox.getTaskExecutorNode();
+ toolbox.getTaskActionClient().submit(new
UpdateLocationAction(TaskLocation.create(
+ hostName.getHostAddress(), node.getPlaintextPort(),
node.getTlsPort(), node.isEnablePlaintextPort()
+ )));
}
log.debug("Task setup complete");
return null;
@@ -203,6 +212,7 @@ public abstract class AbstractTask implements Task
// report back to the overlord
UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport);
toolbox.getTaskActionClient().submit(status);
+ toolbox.getTaskActionClient().submit(new
UpdateLocationAction(TaskLocation.unknown()));
if (reportsFile != null && reportsFile.exists()) {
toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 99b0c05b832..ac1fd124ef5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -141,6 +141,11 @@ public interface TaskRunner
// do nothing
}
+ default void updateLocation(Task task, TaskLocation location)
+ {
+ // do nothing
+ }
+
/**
* The maximum number of tasks this TaskRunner can run concurrently.
* Can return -1 if this method is not implemented or capacity can't be
found.
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java
new file mode 100644
index 00000000000..83aeb382dc0
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.common.base.Optional;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class UpdateLocationActionTest
+{
+ @Test
+ public void testFlow() throws UnknownHostException
+ {
+ // get my task location
+ InetAddress hostName = InetAddress.getLocalHost();
+ TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(),
1, 2);
+ UpdateLocationAction action = new UpdateLocationAction(myLocation);
+ Task task = NoopTask.create();
+ TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
+ TaskRunner runner = mock(TaskRunner.class);
+ when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner));
+ action.perform(task, toolbox);
+ verify(runner, times(1)).updateLocation(eq(task), eq(myLocation));
+ }
+
+ @Test
+ public void testWithNoTaskRunner() throws UnknownHostException
+ {
+ // get my task location
+ InetAddress hostName = InetAddress.getLocalHost();
+ TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(),
1, 2);
+ UpdateLocationAction action = new UpdateLocationAction(myLocation);
+ Task task = NoopTask.create();
+ TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
+ TaskRunner runner = mock(TaskRunner.class);
+ when(toolbox.getTaskRunner()).thenReturn(Optional.absent());
+ action.perform(task, toolbox);
+ verify(runner, never()).updateStatus(any(), any());
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index bcd2f086fd0..ba210fee228 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -109,7 +109,7 @@ public class AbstractTaskTest
task.run(toolbox);
// call it 3 times, once to update location in setup, then one for status
and location in cleanup
- Mockito.verify(taskActionClient, times(1)).submit(any());
+ Mockito.verify(taskActionClient, times(3)).submit(any());
verify(pusher, times(1)).pushTaskReports(eq("myID"), any());
verify(pusher, times(1)).pushTaskStatus(eq("myID"), any());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]