This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3954685aae Report more metrics to monitor K8s task runner (#14771)
3954685aae is described below

commit 3954685aae1f9a78c81bf788e8f69d10324c28d1
Author: YongGang <[email protected]>
AuthorDate: Wed Aug 16 11:03:53 2023 -0700

    Report more metrics to monitor K8s task runner (#14771)
    
    * Report pod running metrics to monitor K8s task runner
    
    * refine method definition
    
    * fix checkstyle
    
    * implement task metrics
    
    * more comment
    
    * address comments
    
    * update doc for the new metrics reported
    
    * fix checkstyle
    
    * refine method definition
    
    * minor refine
---
 docs/development/extensions-contrib/k8s-jobs.md    |  6 +++++
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  3 +++
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  9 ++++---
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |  3 ++-
 .../k8s/overlord/common/KubernetesPeonClient.java  | 23 +++++++++++++++--
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  |  3 +++
 .../k8s/overlord/KubernetesTaskRunnerTest.java     |  2 +-
 .../overlord/common/KubernetesPeonClientTest.java  | 29 ++++++++++++++++++----
 .../DruidPeonClientIntegrationTest.java            | 13 ++++++++--
 9 files changed, 76 insertions(+), 15 deletions(-)

diff --git a/docs/development/extensions-contrib/k8s-jobs.md 
b/docs/development/extensions-contrib/k8s-jobs.md
index cd925c2ee0..6917fbd87f 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -234,6 +234,12 @@ data:
 |`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long`          | 
Number of seconds you want to wait after a sigterm for container lifecycle 
hooks to complete.  Keep at a smaller value if you want tasks to hold locks for 
shorter periods.                                                                
      |`PT30S` (K8s default)|No|
 |`druid.indexer.runner.capacity`| `Integer`       | Number of concurrent jobs 
that can be sent to Kubernetes.                                                 
                                                                                
                                                       |`2147483647`|No|
 
+### Metrics added
+
+|Metric|Description|Dimensions|Normal value|
+|------|-----------|----------|------------|
+| `k8s/peon/startup/time` | Metric indicating the milliseconds for peon pod to 
startup. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskStatus`, `tags` 
|Varies|
+
 ### Gotchas
 
 - All Druid Pods belonging to one Druid cluster must be inside the same 
Kubernetes namespace.
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 447a8632bb..f6b15f46bc 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -84,6 +84,7 @@ public class KubernetesPeonLifecycle
   private final AtomicReference<State> state = new 
AtomicReference<>(State.NOT_STARTED);
   private final K8sTaskId taskId;
   private final TaskLogs taskLogs;
+  private final Task task;
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
   private final TaskStateListener stateListener;
@@ -102,6 +103,7 @@ public class KubernetesPeonLifecycle
   )
   {
     this.taskId = new K8sTaskId(task);
+    this.task = task;
     this.kubernetesClient = kubernetesClient;
     this.taskLogs = taskLogs;
     this.mapper = mapper;
@@ -126,6 +128,7 @@ public class KubernetesPeonLifecycle
       taskLocation = null;
       kubernetesClient.launchPeonJobAndWaitForStart(
           job,
+          task,
           launchTimeout,
           TimeUnit.MILLISECONDS
       );
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 99095264c3..8fc6a5624d 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -108,7 +108,8 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   private final HttpClient httpClient;
   private final PeonLifecycleFactory peonLifecycleFactory;
   private final ServiceEmitter emitter;
-
+  // currently worker categories aren't supported, so it's hardcoded.
+  protected static final String WORKER_CATEGORY = "_k8s_worker_category";
 
   public KubernetesTaskRunner(
       TaskAdapter adapter,
@@ -356,7 +357,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public Map<String, Long> getTotalTaskSlotCount()
   {
-    return ImmutableMap.of("taskQueue", (long) config.getCapacity());
+    return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
   }
 
   @Override
@@ -374,13 +375,13 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public Map<String, Long> getIdleTaskSlotCount()
   {
-    return Collections.emptyMap();
+    return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, 
config.getCapacity() - tasks.size()));
   }
 
   @Override
   public Map<String, Long> getUsedTaskSlotCount()
   {
-    return Collections.emptyMap();
+    return ImmutableMap.of(WORKER_CATEGORY, (long) 
Math.min(config.getCapacity(), tasks.size()));
   }
 
   @Override
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 76698ba8fe..92fc220e62 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -92,7 +92,8 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
     KubernetesPeonClient peonClient = new KubernetesPeonClient(
         druidKubernetesClient,
         kubernetesTaskRunnerConfig.getNamespace(),
-        kubernetesTaskRunnerConfig.isDebugJobs()
+        kubernetesTaskRunnerConfig.isDebugJobs(),
+        emitter
     );
 
     runner = new KubernetesTaskRunner(
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 4ef3a7bdaf..320bffba94 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -25,8 +25,12 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
 import java.io.InputStream;
 import java.sql.Timestamp;
@@ -42,15 +46,22 @@ public class KubernetesPeonClient
   private final KubernetesClientApi clientApi;
   private final String namespace;
   private final boolean debugJobs;
+  private final ServiceEmitter emitter;
 
-  public KubernetesPeonClient(KubernetesClientApi clientApi, String namespace, 
boolean debugJobs)
+  public KubernetesPeonClient(
+      KubernetesClientApi clientApi,
+      String namespace,
+      boolean debugJobs,
+      ServiceEmitter emitter
+  )
   {
     this.clientApi = clientApi;
     this.namespace = namespace;
     this.debugJobs = debugJobs;
+    this.emitter = emitter;
   }
 
-  public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit 
timeUnit)
+  public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, 
TimeUnit timeUnit)
   {
     long start = System.currentTimeMillis();
     // launch job
@@ -69,6 +80,7 @@ public class KubernetesPeonClient
                          }, howLong, timeUnit);
       long duration = System.currentTimeMillis() - start;
       log.info("Took task %s %d ms for pod to startup", jobName, duration);
+      emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
       return result;
     });
   }
@@ -254,4 +266,11 @@ public class KubernetesPeonClient
       throw new KubernetesResourceNotFoundException("K8s pod with label: 
job-name=" + jobName + " not found");
     }
   }
+
+  private void emitK8sPodMetrics(Task task, String metric, long durationMs)
+  {
+    ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+    emitter.emit(metricBuilder.build(metric, durationMs));
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 980a425a85..3a017e5f74 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -100,6 +100,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
 
     EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
         EasyMock.eq(job),
+        EasyMock.eq(task),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(null);
@@ -144,6 +145,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
 
     EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
         EasyMock.eq(job),
+        EasyMock.eq(task),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(null);
@@ -192,6 +194,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
 
     EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
         EasyMock.eq(job),
+        EasyMock.eq(task),
         EasyMock.anyLong(),
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(null);
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 0359488802..613e3b1031 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -367,7 +367,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     MatcherAssert.assertThat(slotCount, Matchers.allOf(
         Matchers.aMapWithSize(1),
         Matchers.hasEntry(
-            Matchers.equalTo("taskQueue"),
+            Matchers.equalTo(KubernetesTaskRunner.WORKER_CATEGORY),
             Matchers.equalTo(1L)
         )
     ));
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index f72ebef1f6..f6096b675d 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -30,7 +30,10 @@ import 
io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -38,6 +41,8 @@ import org.junit.jupiter.api.Test;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -54,12 +59,23 @@ public class KubernetesPeonClientTest
   private KubernetesMockServer server;
   private KubernetesClientApi clientApi;
   private KubernetesPeonClient instance;
+  private ServiceEmitter serviceEmitter;
+  private Collection<Event> events;
 
   @BeforeEach
   public void setup()
   {
     clientApi = new TestKubernetesClient(this.client);
-    instance = new KubernetesPeonClient(clientApi, NAMESPACE, false);
+    events = new ArrayList<>();
+    serviceEmitter = new ServiceEmitter("service", "host", null)
+    {
+      @Override
+      public void emit(Event event)
+      {
+        events.add(event);
+      }
+    };
+    instance = new KubernetesPeonClient(clientApi, NAMESPACE, false, 
serviceEmitter);
   }
 
   @Test
@@ -83,9 +99,10 @@ public class KubernetesPeonClientTest
 
     client.pods().inNamespace(NAMESPACE).resource(pod).create();
 
-    Pod peonPod = instance.launchPeonJobAndWaitForStart(job, 1, 
TimeUnit.SECONDS);
+    Pod peonPod = instance.launchPeonJobAndWaitForStart(job, 
NoopTask.create(), 1, TimeUnit.SECONDS);
 
     Assertions.assertNotNull(peonPod);
+    Assertions.assertEquals(1, events.size());
   }
 
   @Test
@@ -111,7 +128,7 @@ public class KubernetesPeonClientTest
 
     Assertions.assertThrows(
         KubernetesClientTimeoutException.class,
-        () -> instance.launchPeonJobAndWaitForStart(job, 1, TimeUnit.SECONDS)
+        () -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, 
TimeUnit.SECONDS)
     );
   }
 
@@ -204,7 +221,8 @@ public class KubernetesPeonClientTest
     KubernetesPeonClient instance = new KubernetesPeonClient(
         new TestKubernetesClient(this.client),
         NAMESPACE,
-        true
+        true,
+        serviceEmitter
     );
 
     Job job = new JobBuilder()
@@ -228,7 +246,8 @@ public class KubernetesPeonClientTest
     KubernetesPeonClient instance = new KubernetesPeonClient(
         new TestKubernetesClient(this.client),
         NAMESPACE,
-        true
+        true,
+        serviceEmitter
     );
 
     Assertions.assertTrue(instance.deletePeonJob(new K8sTaskId(ID)));
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 327e4276d1..22e2311bbb 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -34,6 +34,8 @@ import 
org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import org.apache.druid.k8s.overlord.common.JobResponse;
@@ -90,7 +92,14 @@ public class DruidPeonClientIntegrationTest
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
     k8sClient = new DruidKubernetesClient();
-    peonClient = new KubernetesPeonClient(k8sClient, "default", false);
+    ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", null)
+    {
+      @Override
+      public void emit(Event event)
+      {
+      }
+    };
+    peonClient = new KubernetesPeonClient(k8sClient, "default", false, 
serviceEmitter);
     druidNode = new DruidNode(
         "test",
         null,
@@ -130,7 +139,7 @@ public class DruidPeonClientIntegrationTest
     Job job = adapter.createJobFromPodSpec(podSpec, task, context);
 
     // launch the job and wait to start...
-    peonClient.launchPeonJobAndWaitForStart(job, 1, TimeUnit.MINUTES);
+    peonClient.launchPeonJobAndWaitForStart(job, task, 1, TimeUnit.MINUTES);
 
     // there should be one job that is a k8s peon job that exists
     List<Job> jobs = peonClient.getPeonJobs();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to