This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ecb71cf  KAFKA-7564: Expose single task details in Trogdor (#5852)
ecb71cf is described below

commit ecb71cf4719e6d22d6738f8df2fd9e16dad33295
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Fri Nov 9 18:31:04 2018 +0000

    KAFKA-7564: Expose single task details in Trogdor (#5852)
    
    This commit adds a new "/coordinator/tasks/{taskId}" endpoint which fetches 
details for a single task.
---
 .../kafka/trogdor/coordinator/Coordinator.java     |  6 ++++
 .../trogdor/coordinator/CoordinatorClient.java     | 25 ++++++++++++++
 .../coordinator/CoordinatorRestResource.java       | 14 ++++++++
 .../kafka/trogdor/coordinator/TaskManager.java     | 31 ++++++++++++++++++
 .../org/apache/kafka/trogdor/rest/TaskRequest.java | 38 ++++++++++++++++++++++
 .../apache/kafka/trogdor/common/ExpectedTasks.java |  2 +-
 .../kafka/trogdor/coordinator/CoordinatorTest.java | 37 +++++++++++++++++++++
 7 files changed, 152 insertions(+), 1 deletion(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index c3271c9..cd3da90 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -30,7 +30,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest;
 import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +106,10 @@ public final class Coordinator {
         return taskManager.tasks(request);
     }
 
+    public TaskState task(TaskRequest request) throws Exception {
+        return taskManager.task(request);
+    }
+
     public void beginShutdown(boolean stopAgents) throws Exception {
         restServer.beginShutdown();
         taskManager.beginShutdown(stopAgents);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 780ae73..80937a8 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -32,11 +32,14 @@ import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.UriBuilder;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -151,6 +154,13 @@ public class CoordinatorClient {
         return resp.body();
     }
 
+    public TaskState task(TaskRequest request) throws Exception {
+        String uri = 
UriBuilder.fromPath(url("/coordinator/tasks/{taskId}")).build(request.taskId()).toString();
+        HttpResponse<TaskState> resp = JsonRestServer.httpRequest(log, uri, 
"GET",
+            null, new TypeReference<TaskState>() { }, maxTries);
+        return resp.body();
+    }
+
     public void shutdown() throws Exception {
         HttpResponse<Empty> resp =
             JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), 
"PUT",
@@ -181,6 +191,12 @@ public class CoordinatorClient {
             .type(Boolean.class)
             .dest("show_tasks")
             .help("Show coordinator tasks.");
+        actions.addArgument("--show-task")
+            .action(store())
+            .type(String.class)
+            .dest("show_task")
+            .metavar("TASK_ID")
+            .help("Show a specific coordinator task.");
         actions.addArgument("--create-task")
             .action(store())
             .type(String.class)
@@ -229,6 +245,15 @@ public class CoordinatorClient {
             System.out.println("Got coordinator tasks: " +
                 JsonUtil.toPrettyJsonString(client.tasks(
                     new TasksRequest(null, 0, 0, 0, 0))));
+        } else if (res.getString("show_task") != null) {
+            String taskId = res.getString("show_task");
+            TaskRequest req = new TaskRequest(res.getString("show_task"));
+            try {
+                String taskOutput = String.format("Got coordinator task 
\"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req)));
+                System.out.println(taskOutput);
+            } catch (NotFoundException e) {
+                System.out.println(e.getMessage());
+            }
         } else if (res.getString("create_task") != null) {
             CreateTaskRequest req = JsonUtil.JSON_SERDE.
                 readValue(res.getString("create_task"), 
CreateTaskRequest.class);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index cbfbddd..9163720 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -22,7 +22,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest;
 import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
@@ -35,6 +37,8 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.MediaType;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
@@ -101,6 +105,16 @@ public class CoordinatorRestResource {
         return coordinator().tasks(new TasksRequest(taskId, firstStartMs, 
lastStartMs, firstEndMs, lastEndMs));
     }
 
+    @GET
+    @Path("/tasks/{taskId}")
+    public TaskState tasks(@PathParam("taskId") String taskId) throws 
Throwable {
+        TaskState response = coordinator().task(new TaskRequest(taskId));
+        if (response == null)
+            throw new NotFoundException(String.format("No task with ID \"%s\" 
exists.", taskId));
+
+        return response;
+    }
+
     @PUT
     @Path("/shutdown")
     public Empty beginShutdown(CoordinatorShutdownRequest request) throws 
Throwable {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 74082bd..934acd3 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRunning;
 import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TaskStopping;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerReceiving;
@@ -629,6 +630,36 @@ public final class TaskManager {
     }
 
     /**
+     * Get information about a single task being managed.
+     *
+     * Returns #{@code null} if the task does not exist
+     */
+    public TaskState task(TaskRequest request) throws ExecutionException, 
InterruptedException {
+        return executor.submit(new GetTaskState(request)).get();
+    }
+
+    /**
+     * Gets information about the tasks being managed.  Processed by the state 
change thread.
+     */
+    class GetTaskState implements Callable<TaskState> {
+        private final TaskRequest request;
+
+        GetTaskState(TaskRequest request) {
+            this.request = request;
+        }
+
+        @Override
+        public TaskState call() throws Exception {
+            ManagedTask task = tasks.get(request.taskId());
+            if (task == null) {
+                return null;
+            }
+
+            return task.taskState();
+        }
+    }
+
+    /**
      * Initiate shutdown, but do not wait for it to complete.
      */
     public void beginShutdown(boolean stopAgents) throws ExecutionException, 
InterruptedException {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java
new file mode 100644
index 0000000..e42738f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The request to /coordinator/tasks/{taskId}
+ */
+public class TaskRequest {
+    private final String taskId;
+
+    @JsonCreator
+    public TaskRequest(@JsonProperty("taskId") String taskId) {
+        this.taskId = taskId == null ? "" : taskId;
+    }
+
+    @JsonProperty
+    public String taskId() {
+        return taskId;
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index 121281f..b0e30a0 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -71,7 +71,7 @@ public class ExpectedTasks {
         }
     }
 
-    static class ExpectedTask {
+    public static class ExpectedTask {
         private final String id;
         private final TaskSpec taskSpec;
         private final TaskState taskState;
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index e943484..f22130e 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -41,7 +41,9 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
@@ -53,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.Test;
 
+import javax.ws.rs.NotFoundException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -491,6 +494,40 @@ public class CoordinatorTest {
     }
 
     @Test
+    public void testTaskRequest() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
+            coordinatorClient.createTask(new CreateTaskRequest("foo", 
fooSpec));
+            TaskState expectedState = new 
ExpectedTaskBuilder("foo").taskState(new 
TaskPending(fooSpec)).build().taskState();
+
+            TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
+            assertEquals(expectedState, resp);
+
+            time.sleep(2);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new 
TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02"));
+
+            try {
+                coordinatorClient.task(new TaskRequest("non-existent-foo"));
+                fail("Non existent task request should have raised a 
NotFoundException");
+            } catch (NotFoundException ignored) { }
+        }
+    }
+
+    @Test
     public void testWorkersExitingAtDifferentTimes() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         Scheduler scheduler = new MockScheduler(time);

Reply via email to