This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3954685aae Report more metrics to monitor K8s task runner (#14771)
3954685aae is described below
commit 3954685aae1f9a78c81bf788e8f69d10324c28d1
Author: YongGang <[email protected]>
AuthorDate: Wed Aug 16 11:03:53 2023 -0700
Report more metrics to monitor K8s task runner (#14771)
* Report pod running metrics to monitor K8s task runner
* refine method definition
* fix checkstyle
* implement task metrics
* more comment
* address comments
* update doc for the new metrics reported
* fix checkstyle
* refine method definition
* minor refine
---
docs/development/extensions-contrib/k8s-jobs.md | 6 +++++
.../k8s/overlord/KubernetesPeonLifecycle.java | 3 +++
.../druid/k8s/overlord/KubernetesTaskRunner.java | 9 ++++---
.../k8s/overlord/KubernetesTaskRunnerFactory.java | 3 ++-
.../k8s/overlord/common/KubernetesPeonClient.java | 23 +++++++++++++++--
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 3 +++
.../k8s/overlord/KubernetesTaskRunnerTest.java | 2 +-
.../overlord/common/KubernetesPeonClientTest.java | 29 ++++++++++++++++++----
.../DruidPeonClientIntegrationTest.java | 13 ++++++++--
9 files changed, 76 insertions(+), 15 deletions(-)
diff --git a/docs/development/extensions-contrib/k8s-jobs.md
b/docs/development/extensions-contrib/k8s-jobs.md
index cd925c2ee0..6917fbd87f 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -234,6 +234,12 @@ data:
|`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` |
Number of seconds you want to wait after a sigterm for container lifecycle
hooks to complete. Keep at a smaller value if you want tasks to hold locks for
shorter periods.
|`PT30S` (K8s default)|No|
|`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs
that can be sent to Kubernetes.
|`2147483647`|No|
+### Metrics added
+
+|Metric|Description|Dimensions|Normal value|
+|------|-----------|----------|------------|
+| `k8s/peon/startup/time` | Metric indicating the milliseconds for peon pod to
startup. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskStatus`, `tags`
|Varies|
+
### Gotchas
- All Druid Pods belonging to one Druid cluster must be inside the same
Kubernetes namespace.
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 447a8632bb..f6b15f46bc 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -84,6 +84,7 @@ public class KubernetesPeonLifecycle
private final AtomicReference<State> state = new
AtomicReference<>(State.NOT_STARTED);
private final K8sTaskId taskId;
private final TaskLogs taskLogs;
+ private final Task task;
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
@@ -102,6 +103,7 @@ public class KubernetesPeonLifecycle
)
{
this.taskId = new K8sTaskId(task);
+ this.task = task;
this.kubernetesClient = kubernetesClient;
this.taskLogs = taskLogs;
this.mapper = mapper;
@@ -126,6 +128,7 @@ public class KubernetesPeonLifecycle
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
job,
+ task,
launchTimeout,
TimeUnit.MILLISECONDS
);
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 99095264c3..8fc6a5624d 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -108,7 +108,8 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
-
+ // currently worker categories aren't supported, so it's hardcoded.
+ protected static final String WORKER_CATEGORY = "_k8s_worker_category";
public KubernetesTaskRunner(
TaskAdapter adapter,
@@ -356,7 +357,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
- return ImmutableMap.of("taskQueue", (long) config.getCapacity());
+ return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
}
@Override
@@ -374,13 +375,13 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
- return Collections.emptyMap();
+ return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0,
config.getCapacity() - tasks.size()));
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
- return Collections.emptyMap();
+ return ImmutableMap.of(WORKER_CATEGORY, (long)
Math.min(config.getCapacity(), tasks.size()));
}
@Override
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 76698ba8fe..92fc220e62 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -92,7 +92,8 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
KubernetesPeonClient peonClient = new KubernetesPeonClient(
druidKubernetesClient,
kubernetesTaskRunnerConfig.getNamespace(),
- kubernetesTaskRunnerConfig.isDebugJobs()
+ kubernetesTaskRunnerConfig.isDebugJobs(),
+ emitter
);
runner = new KubernetesTaskRunner(
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 4ef3a7bdaf..320bffba94 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -25,8 +25,12 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.io.InputStream;
import java.sql.Timestamp;
@@ -42,15 +46,22 @@ public class KubernetesPeonClient
private final KubernetesClientApi clientApi;
private final String namespace;
private final boolean debugJobs;
+ private final ServiceEmitter emitter;
- public KubernetesPeonClient(KubernetesClientApi clientApi, String namespace,
boolean debugJobs)
+ public KubernetesPeonClient(
+ KubernetesClientApi clientApi,
+ String namespace,
+ boolean debugJobs,
+ ServiceEmitter emitter
+ )
{
this.clientApi = clientApi;
this.namespace = namespace;
this.debugJobs = debugJobs;
+ this.emitter = emitter;
}
- public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit
timeUnit)
+ public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong,
TimeUnit timeUnit)
{
long start = System.currentTimeMillis();
// launch job
@@ -69,6 +80,7 @@ public class KubernetesPeonClient
}, howLong, timeUnit);
long duration = System.currentTimeMillis() - start;
log.info("Took task %s %d ms for pod to startup", jobName, duration);
+ emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
return result;
});
}
@@ -254,4 +266,11 @@ public class KubernetesPeonClient
throw new KubernetesResourceNotFoundException("K8s pod with label:
job-name=" + jobName + " not found");
}
}
+
+ private void emitK8sPodMetrics(Task task, String metric, long durationMs)
+ {
+ ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ emitter.emit(metricBuilder.build(metric, durationMs));
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 980a425a85..3a017e5f74 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -100,6 +100,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
EasyMock.eq(job),
+ EasyMock.eq(task),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
@@ -144,6 +145,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
EasyMock.eq(job),
+ EasyMock.eq(task),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
@@ -192,6 +194,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
EasyMock.eq(job),
+ EasyMock.eq(task),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 0359488802..613e3b1031 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -367,7 +367,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
MatcherAssert.assertThat(slotCount, Matchers.allOf(
Matchers.aMapWithSize(1),
Matchers.hasEntry(
- Matchers.equalTo("taskQueue"),
+ Matchers.equalTo(KubernetesTaskRunner.WORKER_CATEGORY),
Matchers.equalTo(1L)
)
));
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index f72ebef1f6..f6096b675d 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -30,7 +30,10 @@ import
io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -38,6 +41,8 @@ import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -54,12 +59,23 @@ public class KubernetesPeonClientTest
private KubernetesMockServer server;
private KubernetesClientApi clientApi;
private KubernetesPeonClient instance;
+ private ServiceEmitter serviceEmitter;
+ private Collection<Event> events;
@BeforeEach
public void setup()
{
clientApi = new TestKubernetesClient(this.client);
- instance = new KubernetesPeonClient(clientApi, NAMESPACE, false);
+ events = new ArrayList<>();
+ serviceEmitter = new ServiceEmitter("service", "host", null)
+ {
+ @Override
+ public void emit(Event event)
+ {
+ events.add(event);
+ }
+ };
+ instance = new KubernetesPeonClient(clientApi, NAMESPACE, false,
serviceEmitter);
}
@Test
@@ -83,9 +99,10 @@ public class KubernetesPeonClientTest
client.pods().inNamespace(NAMESPACE).resource(pod).create();
- Pod peonPod = instance.launchPeonJobAndWaitForStart(job, 1,
TimeUnit.SECONDS);
+ Pod peonPod = instance.launchPeonJobAndWaitForStart(job,
NoopTask.create(), 1, TimeUnit.SECONDS);
Assertions.assertNotNull(peonPod);
+ Assertions.assertEquals(1, events.size());
}
@Test
@@ -111,7 +128,7 @@ public class KubernetesPeonClientTest
Assertions.assertThrows(
KubernetesClientTimeoutException.class,
- () -> instance.launchPeonJobAndWaitForStart(job, 1, TimeUnit.SECONDS)
+ () -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1,
TimeUnit.SECONDS)
);
}
@@ -204,7 +221,8 @@ public class KubernetesPeonClientTest
KubernetesPeonClient instance = new KubernetesPeonClient(
new TestKubernetesClient(this.client),
NAMESPACE,
- true
+ true,
+ serviceEmitter
);
Job job = new JobBuilder()
@@ -228,7 +246,8 @@ public class KubernetesPeonClientTest
KubernetesPeonClient instance = new KubernetesPeonClient(
new TestKubernetesClient(this.client),
NAMESPACE,
- true
+ true,
+ serviceEmitter
);
Assertions.assertTrue(instance.deletePeonJob(new K8sTaskId(ID)));
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 327e4276d1..22e2311bbb 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -34,6 +34,8 @@ import
org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.JobResponse;
@@ -90,7 +92,14 @@ public class DruidPeonClientIntegrationTest
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
k8sClient = new DruidKubernetesClient();
- peonClient = new KubernetesPeonClient(k8sClient, "default", false);
+ ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", null)
+ {
+ @Override
+ public void emit(Event event)
+ {
+ }
+ };
+ peonClient = new KubernetesPeonClient(k8sClient, "default", false,
serviceEmitter);
druidNode = new DruidNode(
"test",
null,
@@ -130,7 +139,7 @@ public class DruidPeonClientIntegrationTest
Job job = adapter.createJobFromPodSpec(podSpec, task, context);
// launch the job and wait to start...
- peonClient.launchPeonJobAndWaitForStart(job, 1, TimeUnit.MINUTES);
+ peonClient.launchPeonJobAndWaitForStart(job, task, 1, TimeUnit.MINUTES);
// there should be one job that is a k8s peon job that exists
List<Job> jobs = peonClient.getPeonJobs();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]