This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e53fa0  KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime 
health check endpoints (#6130)
2e53fa0 is described below

commit 2e53fa08af6aba8200f4c7846a8e8e568b56560d
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Tue Jan 15 21:52:48 2019 +0200

    KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check 
endpoints (#6130)
    
    Reviewed-by: Colin P. McCabe <cmcc...@apache.org>
---
 .../java/org/apache/kafka/trogdor/agent/Agent.java | 11 ++++-
 .../apache/kafka/trogdor/agent/AgentClient.java    | 16 ++++++++
 .../kafka/trogdor/agent/AgentRestResource.java     |  7 ++++
 .../kafka/trogdor/coordinator/Coordinator.java     | 13 +++++-
 .../trogdor/coordinator/CoordinatorClient.java     | 16 ++++++++
 .../coordinator/CoordinatorRestResource.java       |  7 ++++
 .../apache/kafka/trogdor/rest/UptimeResponse.java  | 47 ++++++++++++++++++++++
 .../org/apache/kafka/trogdor/agent/AgentTest.java  | 23 ++++++++++-
 .../kafka/trogdor/coordinator/CoordinatorTest.java | 25 ++++++++++--
 9 files changed, 156 insertions(+), 9 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index c76ef26..699e14b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
@@ -30,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +62,8 @@ public final class Agent {
      */
     private final JsonRestServer restServer;
 
+    private final Time time;
+
     /**
      * Create a new Agent.
      *
@@ -70,7 +74,8 @@ public final class Agent {
      */
     public Agent(Platform platform, Scheduler scheduler,
                  JsonRestServer restServer, AgentRestResource resource) {
-        this.serverStartMs = scheduler.time().milliseconds();
+        this.time = scheduler.time();
+        this.serverStartMs = time.milliseconds();
         this.workerManager = new WorkerManager(platform, scheduler);
         this.restServer = restServer;
         resource.setAgent(this);
@@ -94,6 +99,10 @@ public final class Agent {
         return new AgentStatusResponse(serverStartMs, 
workerManager.workerStates());
     }
 
+    public UptimeResponse uptime() {
+        return new UptimeResponse(serverStartMs, time.milliseconds());
+    }
+
     public void createWorker(CreateWorkerRequest req) throws Throwable {
         workerManager.createWorker(req.workerId(), req.taskId(), req.spec());
     }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index c89011b..55c3e7b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -32,6 +32,7 @@ import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,6 +118,13 @@ public class AgentClient {
         return resp.body();
     }
 
+    public UptimeResponse uptime() throws Exception {
+        HttpResponse<UptimeResponse> resp =
+            JsonRestServer.httpRequest(url("/agent/uptime"), "GET",
+                null, new TypeReference<UptimeResponse>() { }, maxTries);
+        return resp.body();
+    }
+
     public void createWorker(CreateWorkerRequest request) throws Exception {
         HttpResponse<Empty> resp =
             JsonRestServer.<Empty>httpRequest(
@@ -168,6 +176,11 @@ public class AgentClient {
             .type(Boolean.class)
             .dest("status")
             .help("Get agent status.");
+        actions.addArgument("--uptime")
+            .action(storeTrue())
+            .type(Boolean.class)
+            .dest("uptime")
+            .help("Get agent uptime.");
         actions.addArgument("--create-worker")
             .action(store())
             .type(String.class)
@@ -212,6 +225,9 @@ public class AgentClient {
         if (res.getBoolean("status")) {
             System.out.println("Got agent status: " +
                 JsonUtil.toPrettyJsonString(client.status()));
+        } else if (res.getBoolean("uptime")) {
+            System.out.println("Got agent uptime: " +
+                JsonUtil.toPrettyJsonString(client.uptime()));
         } else if (res.getString("create_worker") != null) {
             CreateWorkerRequest req = JsonUtil.JSON_SERDE.
                 readValue(res.getString("create_worker"), 
CreateWorkerRequest.class);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
index 1f2ad49..ec3df8b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
@@ -21,6 +21,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
@@ -65,6 +66,12 @@ public class AgentRestResource {
         return agent().status();
     }
 
+    @GET
+    @Path("/uptime")
+    public UptimeResponse uptime() {
+        return agent().uptime();
+    }
+
     @POST
     @Path("/worker/create")
     public Empty createWorker(CreateWorkerRequest req) throws Throwable {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index a41a6f2..867de55 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
@@ -31,9 +32,10 @@ import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskRequest;
-import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +68,8 @@ public final class Coordinator {
      */
     private final JsonRestServer restServer;
 
+    private final Time time;
+
     /**
      * Create a new Coordinator.
      *
@@ -76,7 +80,8 @@ public final class Coordinator {
      */
     public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer 
restServer,
                        CoordinatorRestResource resource, long firstWorkerId) {
-        this.startTimeMs = scheduler.time().milliseconds();
+        this.time = scheduler.time();
+        this.startTimeMs = time.milliseconds();
         this.taskManager = new TaskManager(platform, scheduler, firstWorkerId);
         this.restServer = restServer;
         resource.setCoordinator(this);
@@ -90,6 +95,10 @@ public final class Coordinator {
         return new CoordinatorStatusResponse(startTimeMs);
     }
 
+    public UptimeResponse uptime() {
+        return new UptimeResponse(startTimeMs, time.milliseconds());
+    }
+
     public void createTask(CreateTaskRequest request) throws Throwable {
         taskManager.createTask(request.id(), request.spec());
     }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 4670d2c..3765b3b 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +121,13 @@ public class CoordinatorClient {
         return resp.body();
     }
 
+    public UptimeResponse uptime() throws Exception {
+        HttpResponse<UptimeResponse> resp =
+            JsonRestServer.httpRequest(url("/coordinator/uptime"), "GET",
+                null, new TypeReference<UptimeResponse>() { }, maxTries);
+        return resp.body();
+    }
+
     public void createTask(CreateTaskRequest request) throws Exception {
         HttpResponse<Empty> resp =
             JsonRestServer.httpRequest(log, url("/coordinator/task/create"), 
"POST",
@@ -188,6 +196,11 @@ public class CoordinatorClient {
             .type(Boolean.class)
             .dest("status")
             .help("Get coordinator status.");
+        actions.addArgument("--uptime")
+            .action(storeTrue())
+            .type(Boolean.class)
+            .dest("uptime")
+            .help("Get coordinator uptime.");
         actions.addArgument("--show-tasks")
             .action(storeTrue())
             .type(Boolean.class)
@@ -243,6 +256,9 @@ public class CoordinatorClient {
         if (res.getBoolean("status")) {
             System.out.println("Got coordinator status: " +
                 JsonUtil.toPrettyJsonString(client.status()));
+        } else if (res.getBoolean("uptime")) {
+            System.out.println("Got coordinator uptime: " +
+                JsonUtil.toPrettyJsonString(client.uptime()));
         } else if (res.getBoolean("show_tasks")) {
             System.out.println("Got coordinator tasks: " +
                 JsonUtil.toPrettyJsonString(client.tasks(
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index 91f731e..337f2b4 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -27,6 +27,7 @@ import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TaskStateType;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
@@ -77,6 +78,12 @@ public class CoordinatorRestResource {
         return coordinator().status();
     }
 
+    @GET
+    @Path("/uptime")
+    public UptimeResponse uptime() {
+        return coordinator().uptime();
+    }
+
     @POST
     @Path("/task/create")
     public Empty createTask(CreateTaskRequest request) throws Throwable {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/UptimeResponse.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/UptimeResponse.java
new file mode 100644
index 0000000..51393b1
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/UptimeResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A response from the Trogdor Agent/Coordinator about how long it has been 
running
+ */
+public class UptimeResponse extends Message {
+
+    private long serverStartMs;
+    private long nowMs;
+
+    @JsonCreator
+    public UptimeResponse(@JsonProperty("serverStartMs") long serverStartMs,
+                          @JsonProperty("nowMs") long nowMs) {
+        this.serverStartMs = serverStartMs;
+        this.nowMs = nowMs;
+    }
+
+    @JsonProperty
+    public long serverStartMs() {
+        return serverStartMs;
+    }
+
+    @JsonProperty
+    public long nowMs() {
+        return nowMs;
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 6c20083..425fe65 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -34,21 +34,21 @@ import org.apache.kafka.trogdor.fault.Kibosh;
 import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
 import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
-
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
 import org.apache.kafka.trogdor.rest.TaskDone;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
 import org.apache.kafka.trogdor.task.SampleTaskSpec;
 import org.junit.Assert;
 import org.junit.Rule;
-import org.junit.rules.Timeout;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,6 +60,7 @@ import java.util.HashMap;
 import java.util.TreeMap;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 
 public class AgentTest {
     @Rule
@@ -132,6 +133,7 @@ public class AgentTest {
         AgentClient client = new AgentClient.Builder().
             maxTries(10).target("localhost", agent.port()).build();
         AgentStatusResponse status = client.status();
+
         assertEquals(Collections.emptyMap(), status.workers());
         new ExpectedTasks().waitFor(client);
 
@@ -148,6 +150,23 @@ public class AgentTest {
     }
 
     @Test
+    public void testAgentGetUptime() throws Exception {
+        MockTime time = new MockTime(0, 111, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
+
+        UptimeResponse uptime = client.uptime();
+        assertEquals(agent.uptime(), uptime);
+
+        time.setCurrentTimeMs(150);
+        assertNotEquals(agent.uptime(), uptime);
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
+
+    @Test
     public void testAgentCreateWorkers() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         MockScheduler scheduler = new MockScheduler(time);
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 0247951..660f3cd 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner;
 import org.apache.kafka.trogdor.common.ExpectedTasks;
 import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
 import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
-
 import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
@@ -39,21 +38,22 @@ import 
org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
-import org.apache.kafka.trogdor.rest.TaskRunning;
 import org.apache.kafka.trogdor.rest.TaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TaskStateType;
 import org.apache.kafka.trogdor.rest.TasksRequest;
-import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
 import org.apache.kafka.trogdor.task.SampleTaskSpec;
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.Test;
 
 import javax.ws.rs.NotFoundException;
 import java.util.ArrayList;
@@ -63,6 +63,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -84,6 +85,22 @@ public class CoordinatorTest {
     }
 
     @Test
+    public void testCoordinatorUptime() throws Exception {
+        MockTime time = new MockTime(0, 200, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            scheduler(scheduler).
+            build()) {
+            UptimeResponse uptime = cluster.coordinatorClient().uptime();
+            assertEquals(cluster.coordinator().uptime(), uptime);
+
+            time.setCurrentTimeMs(250);
+            assertNotEquals(cluster.coordinator().uptime(), uptime);
+        }
+    }
+
+    @Test
     public void testCreateTask() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         Scheduler scheduler = new MockScheduler(time);

Reply via email to