nlippis commented on code in PR #14156:
URL: https://github.com/apache/druid/pull/14156#discussion_r1178292269
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
/**
* Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of
truth, so if you launch a task
Review Comment:
I wanted to rewrite this comment so that it accurately describes how tasks
are handled in the new system. I can put together a follow up PR with the
description.
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
/**
* Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of
truth, so if you launch a task
- * shutdown druid, bring up druid, the task will keep running and the state
will be updated when the cluster
- * comes back. Thus while no tasks are technically restorable, all tasks once
launched will run in isolation to the
- * extent possible without requiring the overlord consistently up during their
lifetime.
*/
-
public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
{
-
private static final EmittingLogger log = new
EmittingLogger(KubernetesTaskRunner.class);
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>>
listeners = new CopyOnWriteArrayList<>();
// to cleanup old jobs that might not have been deleted.
private final ScheduledExecutorService cleanupExecutor;
- protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new
ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<String, KubernetesWorkItem> tasks = new
ConcurrentHashMap<>();
protected final TaskAdapter adapter;
- protected final KubernetesPeonClient client;
- private final ObjectMapper mapper;
- private final KubernetesTaskRunnerConfig k8sConfig;
- private final TaskQueueConfig taskQueueConfig;
- private final TaskLogs taskLogs;
+ private final KubernetesPeonClient client;
+ private final KubernetesTaskRunnerConfig config;
private final ListeningExecutorService exec;
private final HttpClient httpClient;
+ private final PeonLifecycleFactory peonLifecycleFactory;
public KubernetesTaskRunner(
- ObjectMapper mapper,
TaskAdapter adapter,
- KubernetesTaskRunnerConfig k8sConfig,
- TaskQueueConfig taskQueueConfig,
- TaskLogs taskLogs,
+ KubernetesTaskRunnerConfig config,
KubernetesPeonClient client,
- HttpClient httpClient
+ HttpClient httpClient,
+ PeonLifecycleFactory peonLifecycleFactory
)
{
- this.mapper = mapper;
this.adapter = adapter;
- this.k8sConfig = k8sConfig;
- this.taskQueueConfig = taskQueueConfig;
- this.taskLogs = taskLogs;
+ this.config = config;
this.client = client;
this.httpClient = httpClient;
+ this.peonLifecycleFactory = peonLifecycleFactory;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
this.exec = MoreExecutors.listeningDecorator(
- Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
- );
- Preconditions.checkArgument(
Review Comment:
I don't really see the value in doing that. If k8s task runner capacity >
max queue size then only max queue size tasks will be running. This is the
behavior the user would expect, and since capacity is virtual (we don't spin
any long lived resources based on the value of capacity) there aren't any
performance/cost drawbacks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]