This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9d4cc501f7 return task status reported by peon (#14040)
9d4cc501f7 is described below
commit 9d4cc501f73a03543e19081c332331fccac67562
Author: Nicholas Lippis <[email protected]>
AuthorDate: Mon Apr 24 15:05:39 2023 -0400
return task status reported by peon (#14040)
* return task status reported by peon
* Write TaskStatus to file in AbstractTask.cleanUp
* Get TaskStatus from task log
* Fix merge conflicts in AbstractTaskTest
* Add unit tests for TaskLogPusher, TaskLogStreamer, NoopTaskLogs to
satisfy code coverage
* Add license headerss
* Fix style
* Remove unknown exception declarations
---
.../druid/k8s/overlord/K8sOverlordModule.java | 2 +
.../druid/k8s/overlord/KubernetesTaskRunner.java | 62 +++---
.../k8s/overlord/KubernetesTaskRunnerFactory.java | 11 +-
.../overlord/KubernetesTaskRunnerFactoryTest.java | 22 +-
.../k8s/overlord/KubernetesTaskRunnerTest.java | 227 ++++++++++++++++++---
.../apache/druid/storage/azure/AzureTaskLogs.java | 19 ++
.../druid/storage/azure/AzureTaskLogsTest.java | 95 +++++++++
.../druid/storage/google/GoogleTaskLogs.java | 20 ++
.../druid/storage/google/GoogleTaskLogsTest.java | 50 +++++
.../druid/storage/hdfs/tasklog/HdfsTaskLogs.java | 25 +++
.../indexing/common/tasklogs/HdfsTaskLogsTest.java | 17 ++
.../org/apache/druid/storage/s3/S3TaskLogs.java | 15 ++
.../apache/druid/storage/s3/S3TaskLogsTest.java | 52 +++++
.../druid/indexing/common/task/AbstractTask.java | 77 ++++---
.../indexing/common/tasklogs/FileTaskLogs.java | 20 ++
.../indexing/common/task/AbstractTaskTest.java | 18 +-
.../task/batch/parallel/TaskMonitorTest.java | 2 +-
.../indexing/common/tasklogs/FileTaskLogsTest.java | 23 +++
.../overlord/SingleTaskBackgroundRunnerTest.java | 14 +-
.../druid/indexing/overlord/TaskQueueTest.java | 12 +-
.../org/apache/druid/tasklogs/NoopTaskLogs.java | 6 +
.../org/apache/druid/tasklogs/TaskLogPusher.java | 4 +
.../org/apache/druid/tasklogs/TaskLogStreamer.java | 5 +
.../apache/druid/tasklogs/NoopTaskLogsTest.java} | 17 +-
.../apache/druid/tasklogs/TaskLogPusherTest.java} | 25 ++-
.../druid/tasklogs/TaskLogStreamerTest.java} | 29 +--
26 files changed, 720 insertions(+), 149 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
index c176330b15..b68b4d07a2 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
@@ -37,6 +37,7 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
@@ -78,6 +79,7 @@ public class K8sOverlordModule implements DruidModule
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
+ binder.bind(TaskLogStreamer.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 2569b40b2a..a360dc09f0 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -19,6 +19,7 @@
package org.apache.druid.k8s.overlord;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@@ -31,6 +32,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.netty.util.SuppressForbidden;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -44,6 +46,7 @@ import
org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
@@ -57,8 +60,8 @@ import
org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
-import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.apache.druid.tasklogs.TaskLogs;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
@@ -66,6 +69,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -73,6 +77,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
@@ -101,28 +106,31 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new
ConcurrentHashMap<>();
protected final TaskAdapter adapter;
+ protected final KubernetesPeonClient client;
+ private final ObjectMapper mapper;
private final KubernetesTaskRunnerConfig k8sConfig;
private final TaskQueueConfig taskQueueConfig;
- private final TaskLogPusher taskLogPusher;
+ private final TaskLogs taskLogs;
private final ListeningExecutorService exec;
- private final KubernetesPeonClient client;
private final HttpClient httpClient;
public KubernetesTaskRunner(
+ ObjectMapper mapper,
TaskAdapter adapter,
KubernetesTaskRunnerConfig k8sConfig,
TaskQueueConfig taskQueueConfig,
- TaskLogPusher taskLogPusher,
+ TaskLogs taskLogs,
KubernetesPeonClient client,
HttpClient httpClient
)
{
+ this.mapper = mapper;
this.adapter = adapter;
this.k8sConfig = k8sConfig;
this.taskQueueConfig = taskQueueConfig;
- this.taskLogPusher = taskLogPusher;
+ this.taskLogs = taskLogs;
this.client = client;
this.httpClient = httpClient;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
@@ -178,20 +186,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
completedPhase = monitorJob(k8sTaskId);
}
}
- TaskStatus status;
- if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) {
- status = TaskStatus.success(task.getId());
- } else if (completedPhase.getJob() == null) {
- status = TaskStatus.failure(
- task.getId(),
- "K8s Job for task disappeared before completion: " +
k8sTaskId
- );
- } else {
- status = TaskStatus.failure(
- task.getId(),
- "Task failed: " + k8sTaskId
- );
- }
+ TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
if (completedPhase.getJobDuration().isPresent()) {
status =
status.withDuration(completedPhase.getJobDuration().get());
}
@@ -210,7 +205,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
if (logStream.isPresent()) {
FileUtils.copyInputStreamToFile(logStream.get(),
log.toFile());
}
- taskLogPusher.pushTaskLog(task.getId(), log.toFile());
+ taskLogs.pushTaskLog(task.getId(), log.toFile());
}
finally {
Files.deleteIfExists(log);
@@ -243,10 +238,31 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
);
}
+ private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse)
throws IOException
+ {
+ Optional<InputStream> maybeTaskStatusStream =
taskLogs.streamTaskStatus(task.getOriginalTaskId());
+ if (maybeTaskStatusStream.isPresent()) {
+ String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(),
StandardCharsets.UTF_8);
+ return mapper.readValue(taskStatus, TaskStatus.class);
+ } else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
+ // fallback to behavior before the introduction of task status streaming
for backwards compatibility
+ return TaskStatus.success(task.getOriginalTaskId());
+ } else if (Objects.isNull(jobResponse.getJob())) {
+ return TaskStatus.failure(
+ task.getOriginalTaskId(),
+ StringUtils.format("Task [%s] failed kubernetes job disappeared
before completion", task.getOriginalTaskId())
+ );
+ } else {
+ return TaskStatus.failure(
+ task.getOriginalTaskId(),
+ StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
+ );
+ }
+ }
+
@Override
public void updateStatus(Task task, TaskStatus status)
{
- log.info("Updating task: %s with status %s", task.getId(), status);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
@@ -508,8 +524,8 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
}
boolean tlsEnabled = Boolean.parseBoolean(
mainPod.getMetadata()
- .getAnnotations()
- .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
+ .getAnnotations()
+ .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
return TaskLocation.create(
mainPod.getStatus().getPodIP(),
DruidK8sConstants.PORT,
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 4b9873499c..fdee5702c5 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -41,7 +41,7 @@ import
org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogs;
import java.util.Locale;
import java.util.Properties;
@@ -54,7 +54,7 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private final StartupLoggingConfig startupLoggingConfig;
private final TaskQueueConfig taskQueueConfig;
- private final TaskLogPusher taskLogPusher;
+ private final TaskLogs taskLogs;
private final DruidNode druidNode;
private final TaskConfig taskConfig;
private final Properties properties;
@@ -68,7 +68,7 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
StartupLoggingConfig startupLoggingConfig,
@JacksonInject TaskQueueConfig taskQueueConfig,
- TaskLogPusher taskLogPusher,
+ TaskLogs taskLogs,
@Self DruidNode druidNode,
TaskConfig taskConfig,
Properties properties
@@ -80,7 +80,7 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
this.startupLoggingConfig = startupLoggingConfig;
this.taskQueueConfig = taskQueueConfig;
- this.taskLogPusher = taskLogPusher;
+ this.taskLogs = taskLogs;
this.druidNode = druidNode;
this.taskConfig = taskConfig;
this.properties = properties;
@@ -100,10 +100,11 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
}
runner = new KubernetesTaskRunner(
+ smileMapper,
buildTaskAdapter(client),
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
new DruidKubernetesPeonClient(client,
kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
httpClient
);
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index d00c766db8..8badaafd3a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -31,7 +31,7 @@ import
org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
-import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogs;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -45,7 +45,7 @@ public class KubernetesTaskRunnerFactoryTest
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private StartupLoggingConfig startupLoggingConfig;
private TaskQueueConfig taskQueueConfig;
- private TaskLogPusher taskLogPusher;
+ private TaskLogs taskLogs;
private DruidNode druidNode;
private TaskConfig taskConfig;
private Properties properties;
@@ -62,7 +62,7 @@ public class KubernetesTaskRunnerFactoryTest
null,
null
);
- taskLogPusher = new NoopTaskLogs();
+ taskLogs = new NoopTaskLogs();
druidNode = new DruidNode(
"test",
"",
@@ -85,7 +85,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
properties
@@ -106,7 +106,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
properties
@@ -129,7 +129,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
properties
@@ -153,7 +153,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
props
@@ -179,7 +179,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
props
@@ -206,7 +206,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
props
@@ -230,7 +230,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
props
@@ -257,7 +257,7 @@ public class KubernetesTaskRunnerFactoryTest
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
druidNode,
taskConfig,
props
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 6c651f0cbd..c31a2a42f8 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -49,6 +49,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -62,7 +63,7 @@ import
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogs;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
@@ -96,7 +97,7 @@ public class KubernetesTaskRunnerTest
private StartupLoggingConfig startupLoggingConfig;
private ObjectMapper jsonMapper;
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
- private TaskLogPusher taskLogPusher;
+ private TaskLogs taskLogs;
private DruidNode druidNode;
@Before
@@ -116,7 +117,7 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig.javaOptsArray =
Collections.singletonList("-Xmx2g");
taskQueueConfig = new TaskQueueConfig(1, Period.millis(1),
Period.millis(1), Period.millis(1));
startupLoggingConfig = new StartupLoggingConfig();
- taskLogPusher = mock(TaskLogPusher.class);
+ taskLogs = mock(TaskLogs.class);
druidNode = mock(DruidNode.class);
when(druidNode.isEnableTlsPort()).thenReturn(false);
}
@@ -156,22 +157,30 @@ public class KubernetesTaskRunnerTest
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
+ TaskStatus taskStatus = TaskStatus.success(task.getId());
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+ jsonMapper.writeValueAsString(taskStatus),
+ StandardCharsets.UTF_8))
+ );
+
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
ListenableFuture<TaskStatus> future = spyRunner.run(task);
- future.get();
+ TaskStatus actualTaskStatus = future.get();
+ Assert.assertEquals(taskStatus, actualTaskStatus);
// we should never launch the job here, one exists
verify(peonClient, never()).launchJobAndWaitForStart(isA(Job.class),
anyLong(), isA(TimeUnit.class));
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
- verify(spyRunner, times(1)).updateStatus(eq(task),
eq(TaskStatus.success(task.getId())));
+ verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus));
}
@Test
@@ -208,22 +217,156 @@ public class KubernetesTaskRunnerTest
job,
PeonPhase.SUCCEEDED
));
+
+ TaskStatus taskStatus = TaskStatus.success(task.getId());
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+ jsonMapper.writeValueAsString(taskStatus),
+ StandardCharsets.UTF_8))
+ );
+
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
+ ListenableFuture<TaskStatus> future = spyRunner.run(task);
+ TaskStatus actualTaskStatus = future.get();
+ Assert.assertEquals(taskStatus, actualTaskStatus);
+ // we should never launch the job here, one exists
+ verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class),
anyLong(), isA(TimeUnit.class));
+ verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
+ verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus));
+ }
+
+ @Test
+ public void
test_run_withSuccessfulJobAndWithoutStatusFile_returnsSucessfulTask() throws
Exception
+ {
+ Task task = makeTask();
+ K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
+
+ Job job = mock(Job.class);
+ ObjectMeta jobMetadata = mock(ObjectMeta.class);
+ when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
+ JobStatus status = mock(JobStatus.class);
+ when(status.getActive()).thenReturn(1).thenReturn(null);
+ when(job.getStatus()).thenReturn(status);
+ when(job.getMetadata()).thenReturn(jobMetadata);
+
+ Pod peonPod = mock(Pod.class);
+ ObjectMeta metadata = mock(ObjectMeta.class);
+ when(metadata.getName()).thenReturn("peonPodName");
+ when(peonPod.getMetadata()).thenReturn(metadata);
+ PodStatus podStatus = mock(PodStatus.class);
+ when(podStatus.getPodIP()).thenReturn("SomeIP");
+ when(peonPod.getStatus()).thenReturn(podStatus);
+
+ K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
+ when(adapter.fromTask(eq(task))).thenReturn(job);
+
+ DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
+
+
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
+ when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(),
isA(TimeUnit.class))).thenReturn(peonPod);
+ when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
+ when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(),
isA(TimeUnit.class))).thenReturn(new JobResponse(
+ job,
+ PeonPhase.SUCCEEDED
+ ));
+
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
+
+ when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
+ when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
+
+ KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
+ adapter,
+ kubernetesTaskRunnerConfig,
+ taskQueueConfig,
+ taskLogs,
+ peonClient,
+ null
+ );
+ KubernetesTaskRunner spyRunner = spy(taskRunner);
+
+ ListenableFuture<TaskStatus> future = spyRunner.run(task);
+ TaskStatus actualTaskStatus = future.get();
+ Assert.assertTrue(actualTaskStatus.isSuccess());
+
+ // we should never launch the job here, one exists
+ verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class),
anyLong(), isA(TimeUnit.class));
+ verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
+ verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus));
+ }
+
+ @Test
+ public void test_run_withFailedJob_returnsFailedTask() throws Exception
+ {
+ Task task = makeTask();
+ K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
+
+ Job job = mock(Job.class);
+ ObjectMeta jobMetadata = mock(ObjectMeta.class);
+ when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
+ JobStatus status = mock(JobStatus.class);
+ when(status.getActive()).thenReturn(1).thenReturn(null);
+ when(job.getStatus()).thenReturn(status);
+ when(job.getMetadata()).thenReturn(jobMetadata);
+
+ Pod peonPod = mock(Pod.class);
+ ObjectMeta metadata = mock(ObjectMeta.class);
+ when(metadata.getName()).thenReturn("peonPodName");
+ when(peonPod.getMetadata()).thenReturn(metadata);
+ PodStatus podStatus = mock(PodStatus.class);
+ when(podStatus.getPodIP()).thenReturn("SomeIP");
+ when(peonPod.getStatus()).thenReturn(podStatus);
+
+ K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
+ when(adapter.fromTask(eq(task))).thenReturn(job);
+
+ DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
+
+
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
+ when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(),
isA(TimeUnit.class))).thenReturn(peonPod);
+ when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
+ when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(),
isA(TimeUnit.class))).thenReturn(new JobResponse(
+ job,
+ PeonPhase.FAILED
+ ));
+
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
+
+ when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
+ when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
+
+ KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
+ adapter,
+ kubernetesTaskRunnerConfig,
+ taskQueueConfig,
+ taskLogs,
+ peonClient,
+ null
+ );
+ KubernetesTaskRunner spyRunner = spy(taskRunner);
ListenableFuture<TaskStatus> future = spyRunner.run(task);
- future.get();
+ TaskStatus actualTaskStatus = future.get();
+ Assert.assertTrue(actualTaskStatus.isFailure());
+ Assert.assertEquals(
+ StringUtils.format("Task [%s] failed", task.getId()),
+ actualTaskStatus.getErrorMsg()
+ );
+
// we should never launch the job here, one exists
verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class),
anyLong(), isA(TimeUnit.class));
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
@@ -233,7 +376,7 @@ public class KubernetesTaskRunnerTest
DruidK8sConstants.TLS_PORT,
druidNode.isEnableTlsPort()
);
- verify(spyRunner, times(1)).updateStatus(eq(task),
eq(TaskStatus.success(task.getId(), expectedTaskLocation)));
+ verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus));
}
@Test
@@ -274,14 +417,22 @@ public class KubernetesTaskRunnerTest
job,
PeonPhase.SUCCEEDED
));
+
+ TaskStatus taskStatus = TaskStatus.success(task.getId());
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+ jsonMapper.writeValueAsString(taskStatus),
+ StandardCharsets.UTF_8))
+ );
+
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
null
);
@@ -337,14 +488,22 @@ public class KubernetesTaskRunnerTest
job,
PeonPhase.SUCCEEDED
));
+
+ TaskStatus taskStatus = TaskStatus.success(task.getId());
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+ jsonMapper.writeValueAsString(taskStatus),
+ StandardCharsets.UTF_8))
+ );
+
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
null
);
@@ -411,10 +570,11 @@ public class KubernetesTaskRunnerTest
);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- adapter,
+ jsonMapper,
+ null,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
httpClient
);
@@ -432,10 +592,11 @@ public class KubernetesTaskRunnerTest
Task task = makeTask();
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
mock(DruidKubernetesPeonClient.class),
mock(HttpClient.class)
);
@@ -484,10 +645,11 @@ public class KubernetesTaskRunnerTest
when(future.get()).thenThrow(InterruptedException.class);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- adapter,
+ jsonMapper,
+ null,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
httpClient
);
@@ -539,10 +701,11 @@ public class KubernetesTaskRunnerTest
when(future.get()).thenThrow(InterruptedException.class);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- adapter,
+ jsonMapper,
+ null,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
httpClient
);
@@ -594,10 +757,11 @@ public class KubernetesTaskRunnerTest
when(future.get()).thenThrow(ExecutionException.class);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
- adapter,
+ jsonMapper,
+ null,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
httpClient
);
@@ -619,10 +783,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
null
);
@@ -647,10 +812,11 @@ public class KubernetesTaskRunnerTest
Period.millis(1)
);
assertThrows(IllegalArgumentException.class, () -> new
KubernetesTaskRunner(
+ jsonMapper,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
mock(DruidKubernetesPeonClient.class),
null
));
@@ -724,6 +890,8 @@ public class KubernetesTaskRunnerTest
DruidKubernetesPeonClient peonClient =
mock(DruidKubernetesPeonClient.class);
+
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
+
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
@@ -736,18 +904,19 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+ jsonMapper,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
- taskLogPusher,
+ taskLogs,
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
ListenableFuture<TaskStatus> future = spyRunner.run(task);
- TaskStatus taskStatus = future.get();
- Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
- Assert.assertEquals("K8s Job for task disappeared before completion: [
k8sTaskId, k8staskid]", taskStatus.getErrorMsg());
+ TaskStatus taskStatusResponse = future.get();
+ Assert.assertEquals(TaskState.FAILED, taskStatusResponse.getStatusCode());
+ Assert.assertEquals("Task [k8sTaskId] failed kubernetes job disappeared
before completion", taskStatusResponse.getErrorMsg());
}
@@ -762,9 +931,9 @@ public class KubernetesTaskRunnerTest
null,
null,
ImmutableMap.of("druid.indexer.runner.javaOpts", "abc",
-
"druid.indexer.fork.property.druid.processing.buffer.sizeBytes", "2048",
- "druid.peon.pod.cpu", "1",
- "druid.peon.pod.memory", "2G"
+ "druid.indexer.fork.property.druid.processing.buffer.sizeBytes",
"2048",
+ "druid.peon.pod.cpu", "1",
+ "druid.peon.pod.memory", "2G"
)
);
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 678a43e5db..9bfda5ab34 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -82,6 +82,14 @@ public class AzureTaskLogs implements TaskLogs
pushTaskFile(reportFile, taskKey);
}
+ @Override
+ public void pushTaskStatus(String taskid, File statusFile)
+ {
+ final String taskKey = getTaskStatusKey(taskid);
+ log.info("Pushing task status %s to: %s", statusFile, taskKey);
+ pushTaskFile(statusFile, taskKey);
+ }
+
private void pushTaskFile(final File logFile, String taskKey)
{
try {
@@ -110,6 +118,12 @@ public class AzureTaskLogs implements TaskLogs
return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
}
+ @Override
+ public Optional<InputStream> streamTaskStatus(String taskid) throws
IOException
+ {
+ return streamTaskFile(taskid, 0, getTaskStatusKey(taskid));
+ }
+
private Optional<InputStream> streamTaskFile(final String taskid, final long
offset, String taskKey)
throws IOException
{
@@ -154,6 +168,11 @@ public class AzureTaskLogs implements TaskLogs
return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid);
}
+ private String getTaskStatusKey(String taskid)
+ {
+ return StringUtils.format("%s/%s/status.json", config.getPrefix(), taskid);
+ }
+
@Override
public void killAll() throws IOException
{
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 5313c1d5f5..297545e4ca 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -155,6 +155,28 @@ public class AzureTaskLogsTest extends EasyMockSupport
}
}
+ @Test
+ public void test_PushTaskStatus_uploadsBlob() throws Exception
+ {
+ final File tmpDir = FileUtils.createTempDir();
+
+ try {
+ final File logFile = new File(tmpDir, "status.json");
+
+ azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID +
"/status.json");
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ azureTaskLogs.pushTaskStatus(TASK_ID, logFile);
+
+ verifyAll();
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
@Test(expected = RuntimeException.class)
public void test_PushTaskReports_exception_rethrowsException() throws
Exception
{
@@ -323,6 +345,79 @@ public class AzureTaskLogsTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void test_streamTaskStatus_blobExists_succeeds() throws Exception
+ {
+ final String taskStatus = "{}";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER,
blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlobLength(CONTAINER,
blobPath)).andReturn((long) taskStatus.length());
+ EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER,
blobPath)).andReturn(
+ new ByteArrayInputStream(taskStatus.getBytes(StandardCharsets.UTF_8)));
+
+
+ replayAll();
+
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskStatus(TASK_ID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(stream.get(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), taskStatus);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskStatus_blobDoesNotExist_returnsAbsent() throws
Exception
+ {
+ final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/status.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER,
blobPath)).andReturn(false);
+
+ replayAll();
+
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskStatus(TASK_ID_NOT_FOUND);
+
+
+ Assert.assertFalse(stream.isPresent());
+
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void
test_streamTaskStatus_exceptionWhenGettingStream_throwsException() throws
Exception
+ {
+ final String taskStatus = "{}";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER,
blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlobLength(CONTAINER,
blobPath)).andReturn((long) taskStatus.length());
+ EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER,
blobPath)).andThrow(
+ new URISyntaxException("", ""));
+
+
+ replayAll();
+
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskStatus(TASK_ID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(stream.get(), writer, "UTF-8");
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void
test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException()
throws Exception
+ {
+ final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER,
blobPath)).andThrow(new URISyntaxException("", ""));
+
+ replayAll();
+
+ azureTaskLogs.streamTaskStatus(TASK_ID);
+
+ verifyAll();
+ }
+
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws Exception
{
diff --git
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
index fcdee4039b..ae4024172a 100644
---
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
+++
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
@@ -74,6 +74,14 @@ public class GoogleTaskLogs implements TaskLogs
pushTaskFile(reportFile, taskKey);
}
+ @Override
+ public void pushTaskStatus(String taskid, File statusFile) throws IOException
+ {
+ final String taskKey = getTaskStatusKey(taskid);
+ LOG.info("Pushing task status %s to: %s", statusFile, taskKey);
+ pushTaskFile(statusFile, taskKey);
+ }
+
private void pushTaskFile(final File logFile, final String taskKey) throws
IOException
{
try (final InputStream fileStream =
Files.newInputStream(logFile.toPath())) {
@@ -115,6 +123,13 @@ public class GoogleTaskLogs implements TaskLogs
return streamTaskFile(taskid, 0, taskKey);
}
+ @Override
+ public Optional<InputStream> streamTaskStatus(String taskid) throws
IOException
+ {
+ final String taskKey = getTaskStatusKey(taskid);
+ return streamTaskFile(taskid, 0, taskKey);
+ }
+
private Optional<InputStream> streamTaskFile(final String taskid, final long
offset, String taskKey)
throws IOException
{
@@ -156,6 +171,11 @@ public class GoogleTaskLogs implements TaskLogs
return config.getPrefix() + "/" + taskid.replace(':', '_') +
".report.json";
}
+ private String getTaskStatusKey(String taskid)
+ {
+ return config.getPrefix() + "/" + taskid.replace(':', '_') +
".status.json";
+ }
+
@Override
public void killAll() throws IOException
{
diff --git
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
index d8b7c61cfc..9bfe2706f8 100644
---
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
+++
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
@@ -109,6 +109,35 @@ public class GoogleTaskLogsTest extends EasyMockSupport
}
}
+ @Test
+ public void testPushTaskStatus() throws Exception
+ {
+ final File tmpDir = FileUtils.createTempDir();
+
+ try {
+ final File statusFile = new File(tmpDir, "status.json");
+ BufferedWriter output = Files.newBufferedWriter(statusFile.toPath(),
StandardCharsets.UTF_8);
+ output.write("{}");
+ output.close();
+
+ storage.insert(
+ EasyMock.eq(BUCKET),
+ EasyMock.eq(PREFIX + "/" + TASKID),
+ EasyMock.anyObject(InputStreamContent.class)
+ );
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ googleTaskLogs.pushTaskLog(TASKID, statusFile);
+
+ verifyAll();
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
@Test
public void testStreamTaskLogWithoutOffset() throws Exception
{
@@ -177,6 +206,27 @@ public class GoogleTaskLogsTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testStreamTaskStatus() throws Exception
+ {
+ final String taskStatus = "{}";
+
+ final String logPath = PREFIX + "/" + TASKID + ".status.json";
+ EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
+ EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long)
taskStatus.length());
+ EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new
ByteArrayInputStream(StringUtils.toUtf8(taskStatus)));
+
+ replayAll();
+
+ final Optional<InputStream> stream =
googleTaskLogs.streamTaskStatus(TASKID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(stream.get(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), taskStatus);
+
+ verifyAll();
+ }
+
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
index 6026a3e3cb..f89b0fafde 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
@@ -75,6 +75,15 @@ public class HdfsTaskLogs implements TaskLogs
log.info("Wrote task reports to: %s", path);
}
+ @Override
+ public void pushTaskStatus(String taskId, File statusFile) throws IOException
+ {
+ final Path path = getTaskStatusFileFromId(taskId);
+ log.info("Writing task status to: %s", path);
+ pushTaskFile(path, statusFile);
+ log.info("Wrote task status to: %s", path);
+ }
+
private void pushTaskFile(Path path, File logFile) throws IOException
{
final FileSystem fs = path.getFileSystem(hadoopConfig);
@@ -100,6 +109,13 @@ public class HdfsTaskLogs implements TaskLogs
return streamTaskFile(path, 0);
}
+ @Override
+ public Optional<InputStream> streamTaskStatus(String taskId) throws
IOException
+ {
+ final Path path = getTaskStatusFileFromId(taskId);
+ return streamTaskFile(path, 0);
+ }
+
private Optional<InputStream> streamTaskFile(final Path path, final long
offset) throws IOException
{
final FileSystem fs = path.getFileSystem(hadoopConfig);
@@ -139,6 +155,15 @@ public class HdfsTaskLogs implements TaskLogs
return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_')
+ ".reports.json"));
}
+ /**
+ * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed
in
+ * path names. So we format paths differently for HDFS.
+ */
+ private Path getTaskStatusFileFromId(String taskId)
+ {
+ return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_')
+ ".status.json"));
+ }
+
// some hadoop version Path.mergePaths does not exist
private static String mergePaths(String path1, String path2)
{
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
index 9724cba694..9d0273d1a9 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
@@ -78,6 +78,23 @@ public class HdfsTaskLogsTest
Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
}
+ @Test
+ public void test_taskStatus() throws Exception
+ {
+ final File tmpDir = tempFolder.newFolder();
+ final File logDir = new File(tmpDir, "logs");
+ final File statusFile = new File(tmpDir, "status.json");
+ final TaskLogs taskLogs = new HdfsTaskLogs(new
HdfsTaskLogsConfig(logDir.toString()), new Configuration());
+
+
+ Files.write("{}", statusFile, StandardCharsets.UTF_8);
+ taskLogs.pushTaskStatus("id", statusFile);
+ Assert.assertEquals(
+ "{}",
+
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus("id").get()))
+ );
+ }
+
@Test
public void testKill() throws Exception
{
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index 27544e1eec..6868ba3c99 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -77,6 +77,13 @@ public class S3TaskLogs implements TaskLogs
return streamTaskFile(0, taskKey);
}
+ @Override
+ public Optional<InputStream> streamTaskStatus(String taskid) throws
IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "status.json");
+ return streamTaskFile(0, taskKey);
+ }
+
private Optional<InputStream> streamTaskFile(final long offset, String
taskKey) throws IOException
{
try {
@@ -141,6 +148,14 @@ public class S3TaskLogs implements TaskLogs
pushTaskFile(reportFile, taskKey);
}
+ @Override
+ public void pushTaskStatus(String taskid, File statusFile) throws IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "status.json");
+ log.info("Pushing task status %s to: %s", statusFile, taskKey);
+ pushTaskFile(statusFile, taskKey);
+ }
+
private void pushTaskFile(final File logFile, String taskKey) throws
IOException
{
try {
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
index 7b82ca102b..13a6e07cc5 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
@@ -76,6 +76,7 @@ public class S3TaskLogsTest extends EasyMockSupport
private static final Exception NON_RECOVERABLE_EXCEPTION = new
SdkClientException(new NullPointerException());
private static final String LOG_CONTENTS = "log_contents";
private static final String REPORT_CONTENTS = "report_contents";
+ private static final String STATUS_CONTENTS = "status_contents";
@Mock
private CurrentTimeMillisSupplier timeSupplier;
@@ -115,6 +116,31 @@ public class S3TaskLogsTest extends EasyMockSupport
);
Assert.assertEquals("The Grant should have full control permission",
Permission.FullControl, grant.getPermission());
}
+
+ @Test
+ public void test_pushTaskStatus() throws IOException
+ {
+
EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class)))
+ .andReturn(new PutObjectResult())
+ .once();
+
+ EasyMock.replay(s3Client);
+
+ S3TaskLogsConfig config = new S3TaskLogsConfig();
+ config.setS3Bucket(TEST_BUCKET);
+ config.setDisableAcl(true);
+
+ CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+ S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+ S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+
+ String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+ File logFile = tempFolder.newFile("status.json");
+
+ s3TaskLogs.pushTaskLog(taskId, logFile);
+
+ EasyMock.verify(s3Client);
+ }
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
@@ -434,6 +460,32 @@ public class S3TaskLogsTest extends EasyMockSupport
Assert.assertEquals(REPORT_CONTENTS, report);
}
+ @Test
+ public void test_status_fetch() throws IOException
+ {
+ EasyMock.reset(s3Client);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(STATUS_CONTENTS.length());
+ EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(new
ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+ GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET,
logPath);
+ getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
+ getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+ EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+ EasyMock.replay(s3Client);
+
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+ Optional<InputStream> inputStreamOptional =
s3TaskLogs.streamTaskStatus(KEY_1);
+ String report = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(STATUS_CONTENTS, report);
+ }
@Nonnull
private S3TaskLogs getS3TaskLogs()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 72bff3784b..3ad7b45285 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -50,6 +50,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
@@ -94,7 +95,9 @@ public abstract class AbstractTask implements Task
private final String dataSource;
private final Map<String, Object> context;
+
private File reportsFile;
+ private File statusFile;
private final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
@@ -147,6 +150,7 @@ public abstract class AbstractTask implements Task
File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt",
toolbox.getAttemptId()).toFile();
FileUtils.mkdirp(attemptDir);
reportsFile = new File(attemptDir, "report.json");
+ statusFile = new File(attemptDir, "status.json");
InetAddress hostName = InetAddress.getLocalHost();
DruidNode node = toolbox.getTaskExecutorNode();
toolbox.getTaskActionClient().submit(new
UpdateLocationAction(TaskLocation.create(
@@ -160,48 +164,55 @@ public abstract class AbstractTask implements Task
@Override
public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
{
- boolean failure = false;
+ TaskStatus taskStatus = TaskStatus.running(getId());
try {
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
}
- TaskStatus taskStatus = runTask(taskToolbox);
- if (taskStatus.isFailure()) {
- failure = true;
- }
+ taskStatus = runTask(taskToolbox);
return taskStatus;
}
catch (Exception e) {
- failure = true;
+ taskStatus = TaskStatus.failure(getId(), e.toString());
throw e;
}
finally {
- cleanUp(taskToolbox, failure);
+ cleanUp(taskToolbox, taskStatus);
}
}
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
- public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws
Exception
{
- if (toolbox.getConfig().isEncapsulatedTask()) {
- // report back to the overlord
- UpdateStatusAction status = new UpdateStatusAction("successful");
- if (failure) {
- status = new UpdateStatusAction("failure");
- }
- toolbox.getTaskActionClient().submit(status);
- toolbox.getTaskActionClient().submit(new
UpdateLocationAction(TaskLocation.unknown()));
-
- if (reportsFile != null && reportsFile.exists()) {
- toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
- log.debug("Pushed task reports");
- } else {
- log.debug("No task reports file exists to push");
- }
- } else {
+ if (!toolbox.getConfig().isEncapsulatedTask()) {
log.debug("Not pushing task logs and reports from task.");
+ return;
+ }
+
+ // report back to the overlord
+ UpdateStatusAction status = new UpdateStatusAction("successful");
+ if (taskStatus.isFailure()) {
+ status = new UpdateStatusAction("failure");
+ }
+ toolbox.getTaskActionClient().submit(status);
+ toolbox.getTaskActionClient().submit(new
UpdateLocationAction(TaskLocation.unknown()));
+
+ if (reportsFile != null && reportsFile.exists()) {
+ toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
+ log.debug("Pushed task reports");
+ } else {
+ log.debug("No task reports file exists to push");
+ }
+
+ if (statusFile != null) {
+ toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
+ toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
+ Files.deleteIfExists(statusFile.toPath());
+ log.debug("Pushed task status");
+ } else {
+ log.debug("No task status file exists to push");
}
}
@@ -281,12 +292,12 @@ public abstract class AbstractTask implements Task
public String toString()
{
return "AbstractTask{" +
- "id='" + id + '\'' +
- ", groupId='" + groupId + '\'' +
- ", taskResource=" + taskResource +
- ", dataSource='" + dataSource + '\'' +
- ", context=" + context +
- '}';
+ "id='" + id + '\'' +
+ ", groupId='" + groupId + '\'' +
+ ", taskResource=" + taskResource +
+ ", dataSource='" + dataSource + '\'' +
+ ", context=" + context +
+ '}';
}
public TaskStatus success()
@@ -372,8 +383,8 @@ public abstract class AbstractTask implements Task
protected static IngestionMode computeBatchIngestionMode(@Nullable
BatchIOConfig ioConfig)
{
final boolean isAppendToExisting = ioConfig == null
- ? BatchIOConfig.DEFAULT_APPEND_EXISTING
- : ioConfig.isAppendToExisting();
+ ? BatchIOConfig.DEFAULT_APPEND_EXISTING
+ : ioConfig.isAppendToExisting();
final boolean isDropExisting = ioConfig == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
return computeIngestionMode(isAppendToExisting, isDropExisting);
}
@@ -388,7 +399,7 @@ public abstract class AbstractTask implements Task
return IngestionMode.REPLACE_LEGACY;
}
throw new IAE("Cannot simultaneously replace and append to existing
segments. "
- + "Either dropExisting or appendToExisting should be set to
false");
+ + "Either dropExisting or appendToExisting should be set to false");
}
public void emitMetric(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
index 5d546da494..a8e5b17245 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
@@ -65,6 +65,15 @@ public class FileTaskLogs implements TaskLogs
log.info("Wrote task report to: %s", outputFile);
}
+ @Override
+ public void pushTaskStatus(String taskid, File statusFile) throws IOException
+ {
+ FileUtils.mkdirp(config.getDirectory());
+ final File outputFile = fileForTask(taskid, statusFile.getName());
+ Files.copy(statusFile, outputFile);
+ log.info("Wrote task status to: %s", outputFile);
+ }
+
@Override
public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
@@ -87,6 +96,17 @@ public class FileTaskLogs implements TaskLogs
}
}
+ @Override
+ public Optional<InputStream> streamTaskStatus(final String taskid) throws
IOException
+ {
+ final File file = fileForTask(taskid, "status.json");
+ if (file.exists()) {
+ return Optional.of(LogUtils.streamFile(file, 0));
+ } else {
+ return Optional.absent();
+ }
+ }
+
private File fileForTask(final String taskid, String filename)
{
return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid,
filename));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index ab0a06b812..cf03134c4a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -19,15 +19,18 @@
package org.apache.druid.indexing.common.task;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -48,10 +51,17 @@ import static org.mockito.Mockito.when;
public class AbstractTaskTest
{
+ private ObjectMapper objectMapper;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Before
+ public void setup()
+ {
+ objectMapper = new TestUtils().getTestObjectMapper();
+ }
+
@Test
public void testSetupAndCleanupIsCalledWtihParameter() throws Exception
{
@@ -73,6 +83,7 @@ public class AbstractTaskTest
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
+ when(toolbox.getJsonMapper()).thenReturn(objectMapper);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@@ -89,7 +100,9 @@ public class AbstractTaskTest
String result = super.setup(toolbox);
File attemptDir = Paths.get(folder.getAbsolutePath(), "attempt",
toolbox.getAttemptId()).toFile();
File reportsDir = new File(attemptDir, "report.json");
+ File statusDir = new File(attemptDir, "status.json");
FileUtils.write(reportsDir, "foo", StandardCharsets.UTF_8);
+ FileUtils.write(statusDir, "{}", StandardCharsets.UTF_8);
return result;
}
};
@@ -98,6 +111,7 @@ public class AbstractTaskTest
// call it 3 times, once to update location in setup, then one for status
and location in cleanup
Mockito.verify(taskActionClient, times(3)).submit(any());
verify(pusher, times(1)).pushTaskReports(eq("myID"), any());
+ verify(pusher, times(1)).pushTaskStatus(eq("myID"), any());
}
@Test
@@ -117,6 +131,7 @@ public class AbstractTaskTest
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
+ when(toolbox.getJsonMapper()).thenReturn(objectMapper);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@@ -161,6 +176,7 @@ public class AbstractTaskTest
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
+ when(toolbox.getJsonMapper()).thenReturn(objectMapper);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@@ -169,7 +185,7 @@ public class AbstractTaskTest
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null,
null)
{
@Override
- public TaskStatus runTask(TaskToolbox toolbox)
+ public TaskStatus runTask(TaskToolbox toolbox)
{
return TaskStatus.failure("myId", "failed");
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 510d0883d3..a09644ea6f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -244,7 +244,7 @@ public class TaskMonitorTest
}
@Override
- public void cleanUp(TaskToolbox toolbox, boolean failure)
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
index 22c13e3464..b88b31443e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.java.util.common.FileUtils;
@@ -95,6 +96,28 @@ public class FileTaskLogsTest
);
}
+ @Test
+ public void testSimpleStatus() throws Exception
+ {
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
+ final File tmpDir = temporaryFolder.newFolder();
+ final File logDir = new File(tmpDir, "druid/myTask");
+ final File statusFile = new File(tmpDir, "status.json");
+
+ final String taskId = "myTask";
+ final TaskStatus taskStatus = TaskStatus.success(taskId);
+ final String taskStatusString = mapper.writeValueAsString(taskStatus);
+ Files.write(taskStatusString, statusFile, StandardCharsets.UTF_8);
+
+ final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
+ taskLogs.pushTaskStatus(taskId, statusFile);
+
+ Assert.assertEquals(
+ taskStatusString,
+
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus(taskId).get()))
+ );
+ }
+
@Test
public void testPushTaskLogDirCreationFails() throws Exception
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 7ea2325f12..a185b583c2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -163,7 +163,7 @@ public class SingleTaskBackgroundRunnerTest
}
@Override
- public void cleanUp(TaskToolbox toolbox, boolean failure)
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}
@@ -181,10 +181,10 @@ public class SingleTaskBackgroundRunnerTest
final QueryRunner<ScanResultValue> queryRunner =
Druids.newScanQueryBuilder()
- .dataSource("foo")
- .intervals(new
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
- .build()
- .getRunner(runner);
+ .dataSource("foo")
+ .intervals(new
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+ .build()
+ .getRunner(runner);
Assert.assertThat(queryRunner,
CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class));
}
@@ -262,7 +262,7 @@ public class SingleTaskBackgroundRunnerTest
}
@Override
- public void cleanUp(TaskToolbox toolbox, boolean failure)
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}
@@ -384,7 +384,7 @@ public class SingleTaskBackgroundRunnerTest
}
@Override
- public void cleanUp(TaskToolbox toolbox, boolean failure)
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 616d2ca8ba..6388cdd573 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -412,10 +412,10 @@ public class TaskQueueTest extends IngestionTestBase
TaskLocation.create("worker", 1, 2)
), workerHolder);
while (!taskRunner.getRunningTasks()
- .stream()
- .map(TaskRunnerWorkItem::getTaskId)
- .collect(Collectors.toList())
- .contains(task.getId())) {
+ .stream()
+ .map(TaskRunnerWorkItem::getTaskId)
+ .collect(Collectors.toList())
+ .contains(task.getId())) {
Thread.sleep(100);
}
taskQueue.shutdown(task.getId(), "shutdown");
@@ -435,7 +435,7 @@ public class TaskQueueTest extends IngestionTestBase
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
- .andReturn(druidNodeDiscovery);
+ .andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
for (String taskId : runningTasks) {
@@ -520,7 +520,7 @@ public class TaskQueueTest extends IngestionTestBase
}
@Override
- public void cleanUp(TaskToolbox toolbox, boolean failure)
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
index 13962d7c4d..287b2f6fcc 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
@@ -47,6 +47,12 @@ public class NoopTaskLogs implements TaskLogs
log.info("Not pushing reports for task: %s", taskid);
}
+ @Override
+ public void pushTaskStatus(String taskid, File statusFile)
+ {
+ log.info("Not pushing status for task: %s", taskid);
+ }
+
@Override
public void killAll()
{
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
index 0a06237e7f..a1cb317de3 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
@@ -35,4 +35,8 @@ public interface TaskLogPusher
default void pushTaskReports(String taskid, File reportFile) throws
IOException
{
}
+
+ default void pushTaskStatus(String taskid, File reportFile) throws
IOException
+ {
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
index 04add17ea5..1ec6a94df1 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
@@ -44,4 +44,9 @@ public interface TaskLogStreamer
{
return Optional.absent();
}
+
+ default Optional<InputStream> streamTaskStatus(final String taskid) throws
IOException
+ {
+ return Optional.absent();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
similarity index 70%
copy from processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
copy to processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
index 0a06237e7f..30192932a9 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
+++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
@@ -19,20 +19,17 @@
package org.apache.druid.tasklogs;
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.junit.Assert;
+import org.junit.Test;
-import java.io.File;
import java.io.IOException;
-/**
- * Something that knows how to persist local task logs to some form of
long-term storage.
- */
-@ExtensionPoint
-public interface TaskLogPusher
+public class NoopTaskLogsTest
{
- void pushTaskLog(String taskid, File logFile) throws IOException;
-
- default void pushTaskReports(String taskid, File reportFile) throws
IOException
+ @Test
+ public void test_streamTaskStatus() throws IOException
{
+ TaskLogs taskLogs = new NoopTaskLogs();
+ Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java
similarity index 66%
copy from processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
copy to
processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java
index 0a06237e7f..15b67409b3 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
+++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java
@@ -19,20 +19,27 @@
package org.apache.druid.tasklogs;
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.junit.Test;
import java.io.File;
import java.io.IOException;
-/**
- * Something that knows how to persist local task logs to some form of
long-term storage.
- */
-@ExtensionPoint
-public interface TaskLogPusher
+public class TaskLogPusherTest
{
- void pushTaskLog(String taskid, File logFile) throws IOException;
-
- default void pushTaskReports(String taskid, File reportFile) throws
IOException
+ /**
+ * Test default implementation of pushTaskStatus in TaskLogPusher interface
for code coverage
+ *
+ * @throws IOException
+ */
+ @Test
+ public void test_pushTaskStatus() throws IOException
{
+ TaskLogPusher taskLogPusher = new TaskLogPusher() {
+ @Override
+ public void pushTaskLog(String taskid, File logFile)
+ {
+ }
+ };
+ taskLogPusher.pushTaskStatus("id", new File(""));
}
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java
similarity index 58%
copy from
processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
copy to
processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java
index 04add17ea5..e61b9f0a1a 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
+++
b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java
@@ -20,28 +20,29 @@
package org.apache.druid.tasklogs;
import com.google.common.base.Optional;
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.junit.Assert;
+import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
-/**
- * Something that knows how to stream logs for tasks.
- */
-@ExtensionPoint
-public interface TaskLogStreamer
+public class TaskLogStreamerTest
{
/**
- * Stream log for a task.
+ * Test default implemenation of streamTaskStatus in TaskLogStreamer
interface for code coverage
*
- * @param offset If zero, stream the entire log. If positive, attempt to
read from this position onwards. If
- * negative, attempt to read this many bytes from the end of
the file (like <tt>tail -n</tt>).
- * @return inputStream for this log, if available
+ * @throws IOException
*/
- Optional<InputStream> streamTaskLog(String taskid, long offset) throws
IOException;
-
- default Optional<InputStream> streamTaskReports(final String taskid) throws
IOException
+ @Test
+ public void test_streamTaskStatus() throws IOException
{
- return Optional.absent();
+ TaskLogStreamer taskLogStreamer = new TaskLogStreamer() {
+ @Override
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
+ {
+ return Optional.absent();
+ }
+ };
+ Assert.assertFalse(taskLogStreamer.streamTaskStatus("id").isPresent());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]