This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2e53fa0 KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check endpoints (#6130) 2e53fa0 is described below commit 2e53fa08af6aba8200f4c7846a8e8e568b56560d Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Tue Jan 15 21:52:48 2019 +0200 KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check endpoints (#6130) Reviewed-by: Colin P. McCabe <cmcc...@apache.org> --- .../java/org/apache/kafka/trogdor/agent/Agent.java | 11 ++++- .../apache/kafka/trogdor/agent/AgentClient.java | 16 ++++++++ .../kafka/trogdor/agent/AgentRestResource.java | 7 ++++ .../kafka/trogdor/coordinator/Coordinator.java | 13 +++++- .../trogdor/coordinator/CoordinatorClient.java | 16 ++++++++ .../coordinator/CoordinatorRestResource.java | 7 ++++ .../apache/kafka/trogdor/rest/UptimeResponse.java | 47 ++++++++++++++++++++++ .../org/apache/kafka/trogdor/agent/AgentTest.java | 23 ++++++++++- .../kafka/trogdor/coordinator/CoordinatorTest.java | 25 ++++++++++-- 9 files changed, 156 insertions(+), 9 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index c76ef26..699e14b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.rest.AgentStatusResponse; @@ -30,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.DestroyWorkerRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +62,8 @@ public final class Agent { */ private final JsonRestServer restServer; + private final Time time; + /** * Create a new Agent. * @@ -70,7 +74,8 @@ public final class Agent { */ public Agent(Platform platform, Scheduler scheduler, JsonRestServer restServer, AgentRestResource resource) { - this.serverStartMs = scheduler.time().milliseconds(); + this.time = scheduler.time(); + this.serverStartMs = time.milliseconds(); this.workerManager = new WorkerManager(platform, scheduler); this.restServer = restServer; resource.setAgent(this); @@ -94,6 +99,10 @@ public final class Agent { return new AgentStatusResponse(serverStartMs, workerManager.workerStates()); } + public UptimeResponse uptime() { + return new UptimeResponse(serverStartMs, time.milliseconds()); + } + public void createWorker(CreateWorkerRequest req) throws Throwable { workerManager.createWorker(req.workerId(), req.taskId(), req.spec()); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java index c89011b..55c3e7b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java @@ -32,6 +32,7 @@ import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,6 +118,13 @@ public class AgentClient { return resp.body(); } + public UptimeResponse uptime() throws Exception { + HttpResponse<UptimeResponse> resp = + JsonRestServer.httpRequest(url("/agent/uptime"), "GET", + null, new TypeReference<UptimeResponse>() { }, maxTries); + return resp.body(); + } + public void createWorker(CreateWorkerRequest request) throws Exception { HttpResponse<Empty> resp = JsonRestServer.<Empty>httpRequest( @@ -168,6 +176,11 @@ public class AgentClient { .type(Boolean.class) .dest("status") .help("Get agent status."); + actions.addArgument("--uptime") + .action(storeTrue()) + .type(Boolean.class) + .dest("uptime") + .help("Get agent uptime."); actions.addArgument("--create-worker") .action(store()) .type(String.class) @@ -212,6 +225,9 @@ public class AgentClient { if (res.getBoolean("status")) { System.out.println("Got agent status: " + JsonUtil.toPrettyJsonString(client.status())); + } else if (res.getBoolean("uptime")) { + System.out.println("Got agent uptime: " + + JsonUtil.toPrettyJsonString(client.uptime())); } else if (res.getString("create_worker") != null) { CreateWorkerRequest req = JsonUtil.JSON_SERDE. readValue(res.getString("create_worker"), CreateWorkerRequest.class); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java index 1f2ad49..ec3df8b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java @@ -21,6 +21,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.DestroyWorkerRequest; import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.UptimeResponse; import javax.servlet.ServletContext; import javax.ws.rs.Consumes; @@ -65,6 +66,12 @@ public class AgentRestResource { return agent().status(); } + @GET + @Path("/uptime") + public UptimeResponse uptime() { + return agent().uptime(); + } + @POST @Path("/worker/create") public Empty createWorker(CreateWorkerRequest req) throws Throwable { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index a41a6f2..867de55 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse; @@ -31,9 +32,10 @@ import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.TaskRequest; -import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TaskState; +import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksResponse; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,8 @@ public final class Coordinator { */ private final JsonRestServer restServer; + private final Time time; + /** * Create a new Coordinator. * @@ -76,7 +80,8 @@ public final class Coordinator { */ public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer restServer, CoordinatorRestResource resource, long firstWorkerId) { - this.startTimeMs = scheduler.time().milliseconds(); + this.time = scheduler.time(); + this.startTimeMs = time.milliseconds(); this.taskManager = new TaskManager(platform, scheduler, firstWorkerId); this.restServer = restServer; resource.setCoordinator(this); @@ -90,6 +95,10 @@ public final class Coordinator { return new CoordinatorStatusResponse(startTimeMs); } + public UptimeResponse uptime() { + return new UptimeResponse(startTimeMs, time.milliseconds()); + } + public void createTask(CreateTaskRequest request) throws Throwable { taskManager.createTask(request.id(), request.spec()); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 4670d2c..3765b3b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,6 +121,13 @@ public class CoordinatorClient { return resp.body(); } + public UptimeResponse uptime() throws Exception { + HttpResponse<UptimeResponse> resp = + JsonRestServer.httpRequest(url("/coordinator/uptime"), "GET", + null, new TypeReference<UptimeResponse>() { }, maxTries); + return resp.body(); + } + public void createTask(CreateTaskRequest request) throws Exception { HttpResponse<Empty> resp = JsonRestServer.httpRequest(log, url("/coordinator/task/create"), "POST", @@ -188,6 +196,11 @@ public class CoordinatorClient { .type(Boolean.class) .dest("status") .help("Get coordinator status."); + actions.addArgument("--uptime") + .action(storeTrue()) + .type(Boolean.class) + .dest("uptime") + .help("Get coordinator uptime."); actions.addArgument("--show-tasks") .action(storeTrue()) .type(Boolean.class) @@ -243,6 +256,9 @@ public class CoordinatorClient { if (res.getBoolean("status")) { System.out.println("Got coordinator status: " + JsonUtil.toPrettyJsonString(client.status())); + } else if (res.getBoolean("uptime")) { + System.out.println("Got coordinator uptime: " + + JsonUtil.toPrettyJsonString(client.uptime())); } else if (res.getBoolean("show_tasks")) { System.out.println("Got coordinator tasks: " + JsonUtil.toPrettyJsonString(client.tasks( diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java index 91f731e..337f2b4 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java @@ -27,6 +27,7 @@ import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskStateType; import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksResponse; +import org.apache.kafka.trogdor.rest.UptimeResponse; import javax.servlet.ServletContext; import javax.ws.rs.Consumes; @@ -77,6 +78,12 @@ public class CoordinatorRestResource { return coordinator().status(); } + @GET + @Path("/uptime") + public UptimeResponse uptime() { + return coordinator().uptime(); + } + @POST @Path("/task/create") public Empty createTask(CreateTaskRequest request) throws Throwable { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/UptimeResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/UptimeResponse.java new file mode 100644 index 0000000..51393b1 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/UptimeResponse.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A response from the Trogdor Agent/Coordinator about how long it has been running + */ +public class UptimeResponse extends Message { + + private long serverStartMs; + private long nowMs; + + @JsonCreator + public UptimeResponse(@JsonProperty("serverStartMs") long serverStartMs, + @JsonProperty("nowMs") long nowMs) { + this.serverStartMs = serverStartMs; + this.nowMs = nowMs; + } + + @JsonProperty + public long serverStartMs() { + return serverStartMs; + } + + @JsonProperty + public long nowMs() { + return nowMs; + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index 6c20083..425fe65 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -34,21 +34,21 @@ import org.apache.kafka.trogdor.fault.Kibosh; import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile; import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec; import org.apache.kafka.trogdor.rest.AgentStatusResponse; - import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.DestroyWorkerRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.StopWorkerRequest; import org.apache.kafka.trogdor.rest.TaskDone; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; import org.apache.kafka.trogdor.task.SampleTaskSpec; import org.junit.Assert; import org.junit.Rule; -import org.junit.rules.Timeout; import org.junit.Test; +import org.junit.rules.Timeout; import java.io.File; import java.io.IOException; @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.TreeMap; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; public class AgentTest { @Rule @@ -132,6 +133,7 @@ public class AgentTest { AgentClient client = new AgentClient.Builder(). maxTries(10).target("localhost", agent.port()).build(); AgentStatusResponse status = client.status(); + assertEquals(Collections.emptyMap(), status.workers()); new ExpectedTasks().waitFor(client); @@ -148,6 +150,23 @@ public class AgentTest { } @Test + public void testAgentGetUptime() throws Exception { + MockTime time = new MockTime(0, 111, 0); + MockScheduler scheduler = new MockScheduler(time); + Agent agent = createAgent(scheduler); + AgentClient client = new AgentClient.Builder(). + maxTries(10).target("localhost", agent.port()).build(); + + UptimeResponse uptime = client.uptime(); + assertEquals(agent.uptime(), uptime); + + time.setCurrentTimeMs(150); + assertNotEquals(agent.uptime(), uptime); + agent.beginShutdown(); + agent.waitForShutdown(); + } + + @Test public void testAgentCreateWorkers() throws Exception { MockTime time = new MockTime(0, 0, 0); MockScheduler scheduler = new MockScheduler(time); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index 0247951..660f3cd 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner; import org.apache.kafka.trogdor.common.ExpectedTasks; import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder; import org.apache.kafka.trogdor.common.MiniTrogdorCluster; - import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec; import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse; import org.apache.kafka.trogdor.rest.CreateTaskRequest; @@ -39,21 +38,22 @@ import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; -import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskRequest; +import org.apache.kafka.trogdor.rest.TaskRunning; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskStateType; import org.apache.kafka.trogdor.rest.TasksRequest; -import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; import org.apache.kafka.trogdor.task.SampleTaskSpec; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.junit.Test; import javax.ws.rs.NotFoundException; import java.util.ArrayList; @@ -63,6 +63,7 @@ import java.util.List; import java.util.Optional; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -84,6 +85,22 @@ public class CoordinatorTest { } @Test + public void testCoordinatorUptime() throws Exception { + MockTime time = new MockTime(0, 200, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + scheduler(scheduler). + build()) { + UptimeResponse uptime = cluster.coordinatorClient().uptime(); + assertEquals(cluster.coordinator().uptime(), uptime); + + time.setCurrentTimeMs(250); + assertNotEquals(cluster.coordinator().uptime(), uptime); + } + } + + @Test public void testCreateTask() throws Exception { MockTime time = new MockTime(0, 0, 0); Scheduler scheduler = new MockScheduler(time);