This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new be3f93e3cf Restore tasks when lifecycle start (#14909)
be3f93e3cf is described below
commit be3f93e3cf0c9f10292d1d5349b24a9402dd38de
Author: YongGang <[email protected]>
AuthorDate: Fri Sep 22 12:03:34 2023 -0700
Restore tasks when lifecycle start (#14909)
* K8s tasks restore should be from lifecycle start
* add test
* add more tests
* fix test
* wait tasks restore finish when start
* fix style
* revert previous change and add comment
---
.../druid/k8s/overlord/KubernetesTaskRunner.java | 23 +--
.../k8s/overlord/KubernetesTaskRunnerTest.java | 159 +++++++++++----------
2 files changed, 95 insertions(+), 87 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 14fe54778a..33efd848d0 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -23,6 +23,7 @@ import com.google.api.client.util.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -60,7 +61,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -309,23 +309,25 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
- List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new
ArrayList<>();
+ return ImmutableList.of();
+ }
+
+ @Override
+ @LifecycleStart
+ public void start()
+ {
+ log.info("Starting K8sTaskRunner...");
+ // Load tasks from previously running jobs and wait for their statuses to
be updated asynchronously.
for (Job job : client.getPeonJobs()) {
try {
- Task task = adapter.toTask(job);
- restoredTasks.add(Pair.of(task, joinAsync(task)));
+ joinAsync(adapter.toTask(job));
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]",
job.getMetadata().getName());
}
}
- return restoredTasks;
- }
+ log.info("Loaded %,d tasks from previous run", tasks.size());
- @Override
- @LifecycleStart
- public void start()
- {
cleanupExecutor.scheduleAtFixedRate(
() ->
client.deleteCompletedPeonJobsOlderThan(
@@ -339,7 +341,6 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
log.debug("Started cleanup executor for jobs older than %s...",
config.getTaskCleanupDelay());
}
-
@Override
@LifecycleStop
public void stop()
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 11e7a75af6..e6b1b8006a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -31,7 +31,6 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
@@ -56,7 +55,6 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -101,6 +99,89 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
);
}
+ @Test
+ public void test_start_withExistingJobs() throws IOException
+ {
+ KubernetesTaskRunner runner = new KubernetesTaskRunner(
+ taskAdapter,
+ config,
+ peonClient,
+ httpClient,
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
+ )
+ {
+ @Override
+ protected ListenableFuture<TaskStatus> joinAsync(Task task)
+ {
+ return tasks.computeIfAbsent(
+ task.getId(),
+ k -> new KubernetesWorkItem(
+ task,
+ Futures.immediateFuture(TaskStatus.success(task.getId()))
+ )
+ ).getResult();
+ }
+ };
+
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(ID)
+ .endMetadata()
+ .build();
+
+ EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
+ EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+
+ replayAll();
+
+ runner.start();
+
+ verifyAll();
+
+ Assert.assertNotNull(runner.tasks);
+ Assert.assertEquals(1, runner.tasks.size());
+ }
+
+ @Test
+ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws
IOException
+ {
+ KubernetesTaskRunner runner = new KubernetesTaskRunner(
+ taskAdapter,
+ config,
+ peonClient,
+ httpClient,
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
+ )
+ {
+ @Override
+ protected ListenableFuture<TaskStatus> joinAsync(Task task)
+ {
+ return tasks.computeIfAbsent(task.getId(), k -> new
KubernetesWorkItem(task, null))
+ .getResult();
+ }
+ };
+
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(ID)
+ .endMetadata()
+ .build();
+
+ EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
+ EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
+
+ replayAll();
+
+ runner.start();
+
+ verifyAll();
+
+ Assert.assertNotNull(runner.tasks);
+ Assert.assertEquals(0, runner.tasks.size());
+ }
+
@Test
public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional()
{
@@ -263,80 +344,6 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
runner.shutdown(task.getId(), "");
}
- @Test
- public void test_restore_withExistingJobs() throws IOException
- {
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- ) {
- @Override
- protected ListenableFuture<TaskStatus> joinAsync(Task task)
- {
- return new KubernetesWorkItem(task, null).getResult();
- }
- };
-
- Job job = new JobBuilder()
- .withNewMetadata()
- .withName(ID)
- .endMetadata()
- .build();
-
- EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
- EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
-
- replayAll();
-
- List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();
-
- verifyAll();
-
- Assert.assertNotNull(tasks);
- Assert.assertEquals(1, tasks.size());
- }
-
- @Test
- public void test_restore_whenDeserializationExceptionThrown_isIgnored()
throws IOException
- {
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- ) {
- @Override
- protected ListenableFuture<TaskStatus> joinAsync(Task task)
- {
- return new KubernetesWorkItem(task, null).getResult();
- }
- };
-
- Job job = new JobBuilder()
- .withNewMetadata()
- .withName(ID)
- .endMetadata()
- .build();
-
- EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
- EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
-
- replayAll();
-
- List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();
-
- verifyAll();
-
- Assert.assertNotNull(tasks);
- Assert.assertEquals(0, tasks.size());
- }
-
@Test
public void test_getTotalTaskSlotCount()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]