This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ecb71cf KAFKA-7564: Expose single task details in Trogdor (#5852) ecb71cf is described below commit ecb71cf4719e6d22d6738f8df2fd9e16dad33295 Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Fri Nov 9 18:31:04 2018 +0000 KAFKA-7564: Expose single task details in Trogdor (#5852) This commit adds a new "/coordinator/tasks/{taskId}" endpoint which fetches details for a single task. --- .../kafka/trogdor/coordinator/Coordinator.java | 6 ++++ .../trogdor/coordinator/CoordinatorClient.java | 25 ++++++++++++++ .../coordinator/CoordinatorRestResource.java | 14 ++++++++ .../kafka/trogdor/coordinator/TaskManager.java | 31 ++++++++++++++++++ .../org/apache/kafka/trogdor/rest/TaskRequest.java | 38 ++++++++++++++++++++++ .../apache/kafka/trogdor/common/ExpectedTasks.java | 2 +- .../kafka/trogdor/coordinator/CoordinatorTest.java | 37 +++++++++++++++++++++ 7 files changed, 152 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index c3271c9..cd3da90 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -30,7 +30,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest; import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.StopTaskRequest; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +106,10 @@ public final class Coordinator { return taskManager.tasks(request); } + public TaskState task(TaskRequest request) throws Exception { + return taskManager.task(request); + } + public void beginShutdown(boolean stopAgents) throws Exception { restServer.beginShutdown(); taskManager.beginShutdown(stopAgents); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 780ae73..80937a8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -32,11 +32,14 @@ import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.StopTaskRequest; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.NotFoundException; import javax.ws.rs.core.UriBuilder; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -151,6 +154,13 @@ public class CoordinatorClient { return resp.body(); } + public TaskState task(TaskRequest request) throws Exception { + String uri = UriBuilder.fromPath(url("/coordinator/tasks/{taskId}")).build(request.taskId()).toString(); + HttpResponse<TaskState> resp = JsonRestServer.httpRequest(log, uri, "GET", + null, new TypeReference<TaskState>() { }, maxTries); + return resp.body(); + } + public void shutdown() throws Exception { HttpResponse<Empty> resp = JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT", @@ -181,6 +191,12 @@ public class CoordinatorClient { .type(Boolean.class) .dest("show_tasks") .help("Show coordinator tasks."); + actions.addArgument("--show-task") + .action(store()) + .type(String.class) + .dest("show_task") + .metavar("TASK_ID") + .help("Show a specific coordinator task."); actions.addArgument("--create-task") .action(store()) .type(String.class) @@ -229,6 +245,15 @@ public class CoordinatorClient { System.out.println("Got coordinator tasks: " + JsonUtil.toPrettyJsonString(client.tasks( new TasksRequest(null, 0, 0, 0, 0)))); + } else if (res.getString("show_task") != null) { + String taskId = res.getString("show_task"); + TaskRequest req = new TaskRequest(res.getString("show_task")); + try { + String taskOutput = String.format("Got coordinator task \"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req))); + System.out.println(taskOutput); + } catch (NotFoundException e) { + System.out.println(e.getMessage()); + } } else if (res.getString("create_task") != null) { CreateTaskRequest req = JsonUtil.JSON_SERDE. readValue(res.getString("create_task"), CreateTaskRequest.class); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java index cbfbddd..9163720 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java @@ -22,7 +22,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest; import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.StopTaskRequest; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import javax.servlet.ServletContext; @@ -35,6 +37,8 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.PathParam; +import javax.ws.rs.NotFoundException; import javax.ws.rs.core.MediaType; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -101,6 +105,16 @@ public class CoordinatorRestResource { return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs)); } + @GET + @Path("/tasks/{taskId}") + public TaskState tasks(@PathParam("taskId") String taskId) throws Throwable { + TaskState response = coordinator().task(new TaskRequest(taskId)); + if (response == null) + throw new NotFoundException(String.format("No task with ID \"%s\" exists.", taskId)); + + return response; + } + @PUT @Path("/shutdown") public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 74082bd..934acd3 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; @@ -629,6 +630,36 @@ public final class TaskManager { } /** + * Get information about a single task being managed. + * + * Returns #{@code null} if the task does not exist + */ + public TaskState task(TaskRequest request) throws ExecutionException, InterruptedException { + return executor.submit(new GetTaskState(request)).get(); + } + + /** + * Gets information about the tasks being managed. Processed by the state change thread. + */ + class GetTaskState implements Callable<TaskState> { + private final TaskRequest request; + + GetTaskState(TaskRequest request) { + this.request = request; + } + + @Override + public TaskState call() throws Exception { + ManagedTask task = tasks.get(request.taskId()); + if (task == null) { + return null; + } + + return task.taskState(); + } + } + + /** * Initiate shutdown, but do not wait for it to complete. */ public void beginShutdown(boolean stopAgents) throws ExecutionException, InterruptedException { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java new file mode 100644 index 0000000..e42738f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The request to /coordinator/tasks/{taskId} + */ +public class TaskRequest { + private final String taskId; + + @JsonCreator + public TaskRequest(@JsonProperty("taskId") String taskId) { + this.taskId = taskId == null ? "" : taskId; + } + + @JsonProperty + public String taskId() { + return taskId; + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java index 121281f..b0e30a0 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java @@ -71,7 +71,7 @@ public class ExpectedTasks { } } - static class ExpectedTask { + public static class ExpectedTask { private final String id; private final TaskSpec taskSpec; private final TaskState taskState; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index e943484..f22130e 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -41,7 +41,9 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; import org.apache.kafka.trogdor.rest.TaskRunning; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; @@ -53,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.Test; +import javax.ws.rs.NotFoundException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -491,6 +494,40 @@ public class CoordinatorTest { } @Test + public void testTaskRequest() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10); + coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("foo")); + assertEquals(expectedState, resp); + + time.sleep(2); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + + try { + coordinatorClient.task(new TaskRequest("non-existent-foo")); + fail("Non existent task request should have raised a NotFoundException"); + } catch (NotFoundException ignored) { } + } + } + + @Test public void testWorkersExitingAtDifferentTimes() throws Exception { MockTime time = new MockTime(0, 0, 0); Scheduler scheduler = new MockScheduler(time);