[ 
https://issues.apache.org/jira/browse/KAFKA-6688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16429678#comment-16429678
 ] 

ASF GitHub Bot commented on KAFKA-6688:
---------------------------------------

rajinisivaram closed pull request #4737: KAFKA-6688. The Trogdor coordinator 
should track task statuses
URL: https://github.com/apache/kafka/pull/4737
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index cda77738d8c..7c8de6d3f22 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.rest.WorkerDone;
@@ -29,6 +30,7 @@
 import org.apache.kafka.trogdor.rest.WorkerStarting;
 import org.apache.kafka.trogdor.rest.WorkerStopping;
 import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 import org.slf4j.Logger;
@@ -43,7 +45,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public final class WorkerManager {
     private static final Logger log = 
LoggerFactory.getLogger(WorkerManager.class);
@@ -190,7 +191,7 @@ synchronized void waitForQuiescence() throws 
InterruptedException {
         /**
          * The worker status.
          */
-        private final AtomicReference<String> status = new 
AtomicReference<>("");
+        private final AgentWorkerStatusTracker status = new 
AgentWorkerStatusTracker();
 
         /**
          * The time when this task was started.
@@ -293,6 +294,8 @@ public void createWorker(final String id, TaskSpec spec) 
throws Exception {
             haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
                 @Override
                 public Void apply(String errorString) {
+                    if (errorString == null)
+                        errorString = "";
                     if (errorString.isEmpty()) {
                         log.info("{}: Worker {} is halting.", nodeName, id);
                     } else {
@@ -306,8 +309,9 @@ public Void apply(String errorString) {
             try {
                 worker.taskWorker.start(platform, worker.status, haltFuture);
             } catch (Exception e) {
+                log.info("{}: Worker {} start() exception", nodeName, id, e);
                 stateChangeExecutor.submit(new HandleWorkerHalting(worker,
-                    "worker.start() exception: " + e.getMessage(), true));
+                    "worker.start() exception: " + Utils.stackTrace(e), true));
             }
             stateChangeExecutor.submit(new FinishCreatingWorker(worker));
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 0129007aa0d..91ef9c2928a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -49,7 +49,6 @@
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerReceiving;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStarting;
@@ -192,6 +191,9 @@ public void run() {
                     // agents going down?
                     return;
                 }
+                if (log.isTraceEnabled()) {
+                    log.trace("{}: got heartbeat status {}", node.name(), 
agentStatus);
+                }
                 // Identify workers which we think should be running, but 
which do not appear
                 // in the agent's response.  We need to send startWorker 
requests for these.
                 for (Map.Entry<String, ManagedWorker> entry : 
workers.entrySet()) {
@@ -203,40 +205,31 @@ public void run() {
                         }
                     }
                 }
-                // Identify tasks which are running, but which we don't know 
about.
-                // Add these to the NodeManager as tasks that should not be 
running.
                 for (Map.Entry<String, WorkerState> entry : 
agentStatus.workers().entrySet()) {
                     String id = entry.getKey();
                     WorkerState state = entry.getValue();
-                    if (!workers.containsKey(id)) {
+                    ManagedWorker worker = workers.get(id);
+                    if (worker == null) {
+                        // Identify tasks which are running, but which we 
don't know about.
+                        // Add these to the NodeManager as tasks that should 
not be running.
                         log.warn("{}: scheduling unknown worker {} for 
stopping.", node.name(), id);
                         workers.put(id, new ManagedWorker(id, state.spec(), 
false, state));
-                    }
-                }
-                // Handle workers which need to be stopped.  Handle workers 
which have newly completed.
-                for (Map.Entry<String, WorkerState> entry : 
agentStatus.workers().entrySet()) {
-                    String id = entry.getKey();
-                    WorkerState state = entry.getValue();
-                    ManagedWorker worker = workers.get(id);
-                    if (state instanceof WorkerStarting || state instanceof 
WorkerRunning) {
-                        if (!worker.shouldRun) {
-                            worker.tryStop();
-                        }
-                    } else if (state instanceof WorkerDone) {
-                        if (!(worker.state instanceof WorkerDone)) {
-                            WorkerDone workerDoneState =  (WorkerDone) state;
-                            String error = workerDoneState.error();
-                            if (error.isEmpty()) {
-                                log.info("{}: Worker {} finished with status 
'{}'",
-                                    node.name(), id, workerDoneState.status());
-                            } else {
-                                log.warn("{}: Worker {} finished with error 
'{}' and status '{}'",
-                                    node.name(), id, error, 
workerDoneState.status());
+                    } else {
+                        // Handle workers which need to be stopped.
+                        if (state instanceof WorkerStarting || state 
instanceof WorkerRunning) {
+                            if (!worker.shouldRun) {
+                                worker.tryStop();
                             }
-                            taskManager.handleWorkerCompletion(node.name(), 
worker.id, error);
+                        }
+                        // Notify the TaskManager if the worker state has 
changed.
+                        if (worker.state.equals(state)) {
+                            log.debug("{}: worker state is still {}", 
node.name(), worker.state);
+                        } else {
+                            log.info("{}: worker state changed from {} to {}", 
node.name(), worker.state, state);
+                            worker.state = state;
+                            taskManager.updateWorkerState(node.name(), 
worker.id, state);
                         }
                     }
-                    worker.state = state;
                 }
             } catch (Throwable e) {
                 log.error("{}: Unhandled exception in NodeHeartbeatRunnable", 
node.name(), e);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index d88e1d56ed6..7e19c8b34ae 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -31,13 +35,15 @@
 import org.apache.kafka.trogdor.rest.TaskStopping;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerReceiving;
+import org.apache.kafka.trogdor.rest.WorkerState;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -172,16 +178,9 @@
         private Future<?> startFuture = null;
 
         /**
-         * The name of the worker nodes involved with this task.
-         * Null if the task is not running.
+         * The states of the workers involved with this task.
          */
-        private Set<String> workers = null;
-
-        /**
-         * The names of the worker nodes which are still running this task.
-         * Null if the task is not running.
-         */
-        private Set<String> activeWorkers = null;
+        public Map<String, WorkerState> workerStates = new TreeMap<>();
 
         /**
          * If this is non-empty, a message describing how this task failed.
@@ -241,14 +240,39 @@ TaskState taskState() {
                 case PENDING:
                     return new TaskPending(spec);
                 case RUNNING:
-                    return new TaskRunning(spec, startedMs);
+                    return new TaskRunning(spec, startedMs, 
getCombinedStatus(workerStates));
                 case STOPPING:
-                    return new TaskStopping(spec, startedMs);
+                    return new TaskStopping(spec, startedMs, 
getCombinedStatus(workerStates));
                 case DONE:
-                    return new TaskDone(spec, startedMs, doneMs, error, 
cancelled);
+                    return new TaskDone(spec, startedMs, doneMs, error, 
cancelled, getCombinedStatus(workerStates));
             }
             throw new RuntimeException("unreachable");
         }
+
+        TreeSet<String> activeWorkers() {
+            TreeSet<String> workerNames = new TreeSet<>();
+            for (Map.Entry<String, WorkerState> entry : 
workerStates.entrySet()) {
+                if (!entry.getValue().done()) {
+                    workerNames.add(entry.getKey());
+                }
+            }
+            return workerNames;
+        }
+    }
+
+    private static final JsonNode getCombinedStatus(Map<String, WorkerState> 
states) {
+        if (states.size() == 1) {
+            return states.values().iterator().next().status();
+        } else {
+            ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
+            for (Map.Entry<String, WorkerState> entry : states.entrySet()) {
+                JsonNode node = entry.getValue().status();
+                if (node != null) {
+                    objectNode.set(entry.getKey(), node);
+                }
+            }
+            return objectNode;
+        }
     }
 
     /**
@@ -349,10 +373,8 @@ public Void call() throws Exception {
             log.info("Running task {} on node(s): {}", task.id, 
Utils.join(nodeNames, ", "));
             task.state = ManagedTaskState.RUNNING;
             task.startedMs = time.milliseconds();
-            task.workers = nodeNames;
-            task.activeWorkers = new HashSet<>();
-            for (String workerName : task.workers) {
-                task.activeWorkers.add(workerName);
+            for (String workerName : nodeNames) {
+                task.workerStates.put(workerName, new 
WorkerReceiving(task.spec));
                 nodeManagers.get(workerName).createWorker(task.id, task.spec);
             }
             return null;
@@ -398,15 +420,16 @@ public TaskSpec call() throws Exception {
                     break;
                 case RUNNING:
                     task.cancelled = true;
-                    if (task.activeWorkers.size() == 0) {
+                    TreeSet<String> activeWorkers = task.activeWorkers();
+                    if (activeWorkers.isEmpty()) {
                         log.info("Task {} is now complete with error: {}", id, 
task.error);
                         task.doneMs = time.milliseconds();
                         task.state = ManagedTaskState.DONE;
                     } else {
-                        for (String workerName : task.activeWorkers) {
+                        for (String workerName : activeWorkers) {
                             nodeManagers.get(workerName).stopWorker(id);
                         }
-                        log.info("Cancelling task {} on worker(s): {}", id, 
Utils.join(task.activeWorkers, ", "));
+                        log.info("Cancelling task {} on worker(s): {}", id, 
Utils.join(activeWorkers, ", "));
                         task.state = ManagedTaskState.STOPPING;
                     }
                     break;
@@ -422,65 +445,79 @@ public TaskSpec call() throws Exception {
     }
 
     /**
-     * A callback NodeManager makes to indicate that a worker has completed.
-     * The task will transition to DONE once all workers are done.
+     * Update the state of a particular agent's worker.
      *
-     * @param nodeName      The node name.
+     * @param nodeName      The node where the agent is running.
      * @param id            The worker name.
-     * @param error         An empty string if there is no error, or an error 
string.
+     * @param state         The worker state.
      */
-    public void handleWorkerCompletion(String nodeName, String id, String 
error) {
-        executor.submit(new HandleWorkerCompletion(nodeName, id, error));
+    public void updateWorkerState(String nodeName, String id, WorkerState 
state) {
+        executor.submit(new UpdateWorkerState(nodeName, id, state));
     }
 
-    class HandleWorkerCompletion implements Callable<Void> {
+    class UpdateWorkerState implements Callable<Void> {
         private final String nodeName;
         private final String id;
-        private final String error;
+        private final WorkerState state;
 
-        HandleWorkerCompletion(String nodeName, String id, String error) {
+        UpdateWorkerState(String nodeName, String id, WorkerState state) {
             this.nodeName = nodeName;
             this.id = id;
-            this.error = error;
+            this.state = state;
         }
 
         @Override
         public Void call() throws Exception {
             ManagedTask task = tasks.get(id);
             if (task == null) {
-                log.error("Can't handle completion of unknown worker {} on 
node {}",
+                log.error("Can't update worker state unknown worker {} on node 
{}",
                     id, nodeName);
                 return null;
             }
-            if ((task.state == ManagedTaskState.PENDING) || (task.state == 
ManagedTaskState.DONE)) {
-                log.error("Task {} got unexpected worker completion from {} 
while " +
-                    "in {} state.", id, nodeName, task.state);
-                return null;
-            }
-            boolean broadcastStop = false;
-            if (task.state == ManagedTaskState.RUNNING) {
-                task.state = ManagedTaskState.STOPPING;
-                broadcastStop = true;
-            }
-            task.maybeSetError(error);
-            task.activeWorkers.remove(nodeName);
-            if (task.activeWorkers.size() == 0) {
-                task.doneMs = time.milliseconds();
-                task.state = ManagedTaskState.DONE;
-                log.info("Task {} is now complete on {} with error: {}", id,
-                    Utils.join(task.workers, ", "),
-                    task.error.isEmpty() ? "(none)" : task.error);
-            } else if (broadcastStop) {
-                log.info("Node {} stopped.  Stopping task {} on worker(s): {}",
-                    id, Utils.join(task.activeWorkers, ", "));
-                for (String workerName : task.activeWorkers) {
-                    nodeManagers.get(workerName).stopWorker(id);
-                }
+            WorkerState prevState = task.workerStates.get(nodeName);
+            log.debug("Task {}: Updating worker state for {} from {} to {}.",
+                id, nodeName, prevState, state);
+            task.workerStates.put(nodeName, state);
+            if (state.done() && (!prevState.done())) {
+                handleWorkerCompletion(task, nodeName, (WorkerDone) state);
             }
             return null;
         }
     }
 
+    /**
+     * Handle a worker being completed.
+     *
+     * @param task      The task that owns the worker.
+     * @param nodeName  The name of the node on which the worker is running.
+     * @param state     The worker state.
+     */
+    private void handleWorkerCompletion(ManagedTask task, String nodeName, 
WorkerDone state) {
+        if (state.error().isEmpty()) {
+            log.info("{}: Worker {} finished with status '{}'",
+                nodeName, task.id, JsonUtil.toJsonString(state.status()));
+        } else {
+            log.warn("{}: Worker {} finished with error '{}' and status '{}'",
+                nodeName, task.id, state.error(), 
JsonUtil.toJsonString(state.status()));
+            task.maybeSetError(state.error());
+        }
+        if (task.activeWorkers().isEmpty()) {
+            task.doneMs = time.milliseconds();
+            task.state = ManagedTaskState.DONE;
+            log.info("{}: Task {} is now complete on {} with error: {}",
+                nodeName, task.id, Utils.join(task.workerStates.keySet(), ", 
"),
+                task.error.isEmpty() ? "(none)" : task.error);
+        } else if ((task.state == ManagedTaskState.RUNNING) && 
(!task.error.isEmpty())) {
+            TreeSet<String> activeWorkers = task.activeWorkers();
+            log.info("{}: task {} stopped with error {}.  Stopping worker(s): 
{}",
+                nodeName, task.id, task.error, Utils.join(activeWorkers, ", 
"));
+            task.state = ManagedTaskState.STOPPING;
+            for (String workerName : activeWorkers) {
+                nodeManagers.get(workerName).stopWorker(task.id);
+            }
+        }
+    }
+
     /**
      * Get information about the tasks being managed.
      */
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
index 629d15e8147..97934a88bbd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
@@ -17,15 +17,15 @@
 
 package org.apache.kafka.trogdor.fault;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 public class KiboshFaultWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(KiboshFaultWorker.class);
 
@@ -35,6 +35,8 @@
 
     private final String mountPath;
 
+    private WorkerStatusTracker status;
+
     public KiboshFaultWorker(String id, KiboshFaultSpec spec, String 
mountPath) {
         this.id = id;
         this.spec = spec;
@@ -42,15 +44,20 @@ public KiboshFaultWorker(String id, KiboshFaultSpec spec, 
String mountPath) {
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
         log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, 
spec);
+        this.status = status;
+        this.status.update(new TextNode("Adding fault " + id));
         Kibosh.INSTANCE.addFault(mountPath, spec);
+        this.status.update(new TextNode("Added fault " + id));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), 
id, spec);
+        this.status.update(new TextNode("Removing fault " + id));
         Kibosh.INSTANCE.removeFault(mountPath, spec);
+        this.status.update(new TextNode("Removed fault " + id));
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
index 787c5e06f18..1b99a93d8fb 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
@@ -17,11 +17,13 @@
 
 package org.apache.kafka.trogdor.fault;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.Topology;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,7 +31,6 @@
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class NetworkPartitionFaultWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(NetworkPartitionFaultWorker.class);
@@ -38,22 +39,29 @@
 
     private final List<Set<String>> partitionSets;
 
+    private WorkerStatusTracker status;
+
     public NetworkPartitionFaultWorker(String id, List<Set<String>> 
partitionSets) {
         this.id = id;
         this.partitionSets = partitionSets;
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
         log.info("Activating NetworkPartitionFault {}.", id);
+        this.status = status;
+        this.status.update(new TextNode("creating network partition " + id));
         runIptablesCommands(platform, "-A");
+        this.status.update(new TextNode("created network partition " + id));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("Deactivating NetworkPartitionFault {}.", id);
+        this.status.update(new TextNode("removing network partition " + id));
         runIptablesCommands(platform, "-D");
+        this.status.update(new TextNode("removed network partition " + id));
     }
 
     private void runIptablesCommands(Platform platform, String iptablesAction) 
throws Exception {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
index 66a8c6edb20..d30eaf766b9 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
@@ -17,16 +17,17 @@
 
 package org.apache.kafka.trogdor.fault;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class ProcessStopFaultWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(ProcessStopFaultWorker.class);
@@ -35,22 +36,29 @@
 
     private final String javaProcessName;
 
+    private WorkerStatusTracker status;
+
     public ProcessStopFaultWorker(String id, String javaProcessName) {
         this.id = id;
         this.javaProcessName = javaProcessName;
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
+        this.status = status;
         log.info("Activating ProcessStopFault {}.", id);
+        this.status.update(new TextNode("stopping " + javaProcessName));
         sendSignals(platform, "SIGSTOP");
+        this.status.update(new TextNode("stopped " + javaProcessName));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("Deactivating ProcessStopFault {}.", id);
+        this.status.update(new TextNode("resuming " + javaProcessName));
         sendSignals(platform, "SIGCONT");
+        this.status.update(new TextNode("resumed " + javaProcessName));
     }
 
     private void sendSignals(Platform platform, String signalName) throws 
Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
index 536d3f20b33..e8d6003bede 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
@@ -16,9 +16,9 @@
  */
 
 package org.apache.kafka.trogdor.rest;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -50,8 +50,9 @@ public TaskDone(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("doneMs") long doneMs,
             @JsonProperty("error") String error,
-            @JsonProperty("cancelled") boolean cancelled) {
-        super(spec);
+            @JsonProperty("cancelled") boolean cancelled,
+            @JsonProperty("status") JsonNode status) {
+        super(spec, status);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
         this.error = error;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
index b0162d35c71..7831301425c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
@@ -19,6 +19,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -27,6 +28,6 @@
 public class TaskPending extends TaskState {
     @JsonCreator
     public TaskPending(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+        super(spec, NullNode.instance);
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
index bff36766c4d..7a81bdf7867 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
@@ -19,6 +19,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -32,8 +33,9 @@
 
     @JsonCreator
     public TaskRunning(@JsonProperty("spec") TaskSpec spec,
-            @JsonProperty("startedMs") long startedMs) {
-        super(spec);
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("status") JsonNode status) {
+        super(spec, status);
         this.startedMs = startedMs;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 28b61087ad0..0764e1445c8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -20,6 +20,8 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -37,12 +39,20 @@
 public abstract class TaskState extends Message {
     private final TaskSpec spec;
 
-    public TaskState(TaskSpec spec) {
+    private final JsonNode status;
+
+    public TaskState(TaskSpec spec, JsonNode status) {
         this.spec = spec;
+        this.status = status == null ? NullNode.instance : status;
     }
 
     @JsonProperty
     public TaskSpec spec() {
         return spec;
     }
+
+    @JsonProperty
+    public JsonNode status() {
+        return status;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
index 4446b75f9fc..d40b43c485c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
@@ -19,6 +19,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -32,8 +33,9 @@
 
     @JsonCreator
     public TaskStopping(@JsonProperty("spec") TaskSpec spec,
-            @JsonProperty("startedMs") long startedMs) {
-        super(spec);
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("status") JsonNode status) {
+        super(spec, status);
         this.startedMs = startedMs;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index e463ffc3451..500d3c6a0c2 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -19,6 +19,8 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -39,7 +41,7 @@
      * The task status.  The format will depend on the type of task that is
      * being run.
      */
-    private final String status;
+    private final JsonNode status;
 
     /**
      * Empty if the task completed without error; the error message otherwise.
@@ -50,12 +52,12 @@
     public WorkerDone(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("doneMs") long doneMs,
-            @JsonProperty("status") String status,
+            @JsonProperty("status") JsonNode status,
             @JsonProperty("error") String error) {
         super(spec);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
-        this.status = status == null ? "" : status;
+        this.status = status == null ? NullNode.instance : status;
         this.error = error == null ? "" : error;
     }
 
@@ -72,7 +74,7 @@ public long doneMs() {
 
     @JsonProperty
     @Override
-    public String status() {
+    public JsonNode status() {
         return status;
     }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
index d3e356555ba..70687743f74 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -19,6 +19,8 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -30,4 +32,9 @@
     public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
         super(spec);
     }
+
+    @Override
+    public JsonNode status() {
+        return new TextNode("receiving");
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index e3b8d1932b2..af8ee88a1ab 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -19,6 +19,8 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -34,15 +36,15 @@
      * The task status.  The format will depend on the type of task that is
      * being run.
      */
-    private final String status;
+    private final JsonNode status;
 
     @JsonCreator
     public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
-            @JsonProperty("status") String status) {
+            @JsonProperty("status") JsonNode status) {
         super(spec);
         this.startedMs = startedMs;
-        this.status = status == null ? "" : status;
+        this.status = status == null ? NullNode.instance : status;
     }
 
     @JsonProperty
@@ -53,7 +55,7 @@ public long startedMs() {
 
     @JsonProperty
     @Override
-    public String status() {
+    public JsonNode status() {
         return status;
     }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
index 3a766ea3808..b568ec1f887 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -19,6 +19,8 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -29,4 +31,9 @@
     public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
         super(spec);
     }
+
+    @Override
+    public JsonNode status() {
+        return new TextNode("starting");
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
index 6d7c687c338..044d719f894 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -20,6 +20,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
@@ -60,9 +61,7 @@ public long startedMs() {
         throw new KafkaException("invalid state");
     }
 
-    public String status() {
-        throw new KafkaException("invalid state");
-    }
+    public abstract JsonNode status();
 
     public boolean running() {
         return false;
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index 777e5114edc..9fbb3ff7306 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -19,6 +19,8 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -34,15 +36,15 @@
      * The task status.  The format will depend on the type of task that is
      * being run.
      */
-    private final String status;
+    private final JsonNode status;
 
     @JsonCreator
     public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
-            @JsonProperty("status") String status) {
+            @JsonProperty("status") JsonNode status) {
         super(spec);
         this.startedMs = startedMs;
-        this.status = status == null ? "" : status;
+        this.status = status == null ? NullNode.instance : status;
     }
 
     @JsonProperty
@@ -53,7 +55,7 @@ public long startedMs() {
 
     @JsonProperty
     @Override
-    public String status() {
+    public JsonNode status() {
         return status;
     }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
new file mode 100644
index 00000000000..2ad8e4ee4ae
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+
+/**
+ * Tracks the status of a Trogdor worker.
+ */
+public class AgentWorkerStatusTracker implements WorkerStatusTracker {
+    private JsonNode status = NullNode.instance;
+
+    @Override
+    public void update(JsonNode newStatus) {
+        JsonNode status = newStatus.deepCopy();
+        synchronized (this) {
+            this.status = status;
+        }
+    }
+
+    /**
+     * Retrieves the status.
+     */
+    public synchronized JsonNode get() {
+        return status;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
index dfa80842260..77336d87059 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
@@ -17,30 +17,34 @@
 
 package org.apache.kafka.trogdor.task;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 public class NoOpTaskWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(NoOpTaskWorker.class);
 
     private final String id;
 
+    private WorkerStatusTracker status;
+
     public NoOpTaskWorker(String id) {
         this.id = id;
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
         log.info("{}: Activating NoOpTask.", id);
+        this.status = status;
+        this.status.update(new TextNode("active"));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("{}: Deactivating NoOpTask.", id);
+        this.status.update(new TextNode("done"));
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
index 288eb9ce147..042568f1a83 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
@@ -20,8 +20,6 @@
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * The agent-side interface for implementing tasks.
  */
@@ -42,7 +40,7 @@
      *
      *
      * @param platform          The platform to use.
-     * @param status            The current status string.  The TaskWorker can 
update
+     * @param status            The current status.  The TaskWorker can update
      *                          this at any time to provide an updated status.
      * @param haltFuture        A future which the worker should complete if 
it halts.
      *                          If it is completed with an empty string, that 
means the task
@@ -53,7 +51,7 @@
      *
      * @throws Exception        If the TaskWorker failed to start.  stop() 
will not be invoked.
      */
-    void start(Platform platform, AtomicReference<String> status, 
KafkaFutureImpl<String> haltFuture)
+    void start(Platform platform, WorkerStatusTracker status, 
KafkaFutureImpl<String> haltFuture)
         throws Exception;
 
     /**
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
new file mode 100644
index 00000000000..dfbc7ea6e44
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Tracks the status of a Trogdor worker.
+ */
+public interface WorkerStatusTracker {
+    /**
+     * Updates the status.
+     *
+     * @param status    The new status.
+     */
+    void update(JsonNode status);
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index a891b83bcc1..4c3095f0fbd 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -19,6 +19,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,6 +35,7 @@
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class ProduceBenchWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(ProduceBenchWorker.class);
@@ -61,7 +62,7 @@
 
     private ScheduledExecutorService executor;
 
-    private AtomicReference<String> status;
+    private WorkerStatusTracker status;
 
     private KafkaFutureImpl<String> doneFuture;
 
@@ -81,7 +82,7 @@ public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> doneFuture) throws Exception {
         if (!running.compareAndSet(false, true)) {
             throw new IllegalStateException("ProducerBenchWorker is already 
running.");
@@ -112,9 +113,10 @@ public void run() {
                     newTopics.put(name, new NewTopic(name, 
spec.numPartitions(),
                                                      
spec.replicationFactor()));
                 }
+                status.update(new TextNode("Creating " + spec.totalTopics() + 
" topic(s)"));
                 WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
                                          spec.adminClientConf(), newTopics, 
false);
-
+                status.update(new TextNode("Created " + spec.totalTopics() + " 
topic(s)"));
                 executor.submit(new SendRecords());
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
@@ -181,7 +183,7 @@ protected synchronized void delay(long amount) throws 
InterruptedException {
             this.histogram = new Histogram(5000);
             int perPeriod = 
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
-                new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
+                new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
             // add common client configs to producer properties, and then 
user-specified producer
@@ -218,10 +220,10 @@ public Void call() throws Exception {
                 WorkerUtils.abort(log, "SendRecords", e, doneFuture);
             } finally {
                 statusUpdaterFuture.cancel(false);
-                new StatusUpdater(histogram).run();
+                StatusData statusData = new StatusUpdater(histogram).update();
                 long curTimeMs = Time.SYSTEM.milliseconds();
                 log.info("Sent {} total record(s) in {} ms.  status: {}",
-                    histogram.summarize().numSamples(), curTimeMs - 
startTimeMs, status.get());
+                    histogram.summarize().numSamples(), curTimeMs - 
startTimeMs, statusData);
             }
             doneFuture.complete("");
             return null;
@@ -234,46 +236,54 @@ void recordDuration(long durationMs) {
 
     public class StatusUpdater implements Runnable {
         private final Histogram histogram;
-        private final float[] percentiles;
 
         StatusUpdater(Histogram histogram) {
             this.histogram = histogram;
-            this.percentiles = new float[] {0.50f, 0.95f, 0.99f};
         }
 
         @Override
         public void run() {
             try {
-                Histogram.Summary summary = histogram.summarize(percentiles);
-                StatusData statusData = new StatusData(summary.numSamples(), 
summary.average(),
-                    summary.percentiles().get(0).value(),
-                    summary.percentiles().get(1).value(),
-                    summary.percentiles().get(2).value());
-                String statusDataString = JsonUtil.toJsonString(statusData);
-                status.set(statusDataString);
+                update();
             } catch (Exception e) {
                 WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
             }
         }
+
+        StatusData update() {
+            Histogram.Summary summary = 
histogram.summarize(StatusData.PERCENTILES);
+            StatusData statusData = new StatusData(summary.numSamples(), 
summary.average(),
+                summary.percentiles().get(0).value(),
+                summary.percentiles().get(1).value(),
+                summary.percentiles().get(2).value());
+            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            return statusData;
+        }
     }
 
     public static class StatusData {
         private final long totalSent;
         private final float averageLatencyMs;
         private final int p50LatencyMs;
-        private final int p90LatencyMs;
+        private final int p95LatencyMs;
         private final int p99LatencyMs;
 
+        /**
+         * The percentiles to use when calculating the histogram data.
+         * These should match up with the p50LatencyMs, p95LatencyMs, etc. 
fields.
+         */
+        final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
         @JsonCreator
         StatusData(@JsonProperty("totalSent") long totalSent,
                    @JsonProperty("averageLatencyMs") float averageLatencyMs,
                    @JsonProperty("p50LatencyMs") int p50latencyMs,
-                   @JsonProperty("p90LatencyMs") int p90latencyMs,
+                   @JsonProperty("p95LatencyMs") int p95latencyMs,
                    @JsonProperty("p99LatencyMs") int p99latencyMs) {
             this.totalSent = totalSent;
             this.averageLatencyMs = averageLatencyMs;
             this.p50LatencyMs = p50latencyMs;
-            this.p90LatencyMs = p90latencyMs;
+            this.p95LatencyMs = p95latencyMs;
             this.p99LatencyMs = p99latencyMs;
         }
 
@@ -293,8 +303,8 @@ public int p50LatencyMs() {
         }
 
         @JsonProperty
-        public int p90LatencyMs() {
-            return p90LatencyMs;
+        public int p95LatencyMs() {
+            return p95LatencyMs;
         }
 
         @JsonProperty
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 08b11ac5603..12b0c08a700 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -39,6 +39,7 @@
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class RoundTripWorker implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
@@ -98,7 +98,7 @@ public RoundTripWorker(String id, RoundTripWorkloadSpec spec) 
{
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> doneFuture) throws Exception {
         if (!running.compareAndSet(false, true)) {
             throw new IllegalStateException("RoundTripWorker is already 
running.");
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 30d13b55cb9..61de5c98797 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.trogdor.agent;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Scheduler;
@@ -122,7 +123,7 @@ public void testAgentCreateWorkers() throws Exception {
         CreateWorkerResponse response = client.createWorker(new 
CreateWorkerRequest("foo", fooSpec));
         assertEquals(fooSpec.toString(), response.spec().toString());
         new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -131,10 +132,10 @@ public void testAgentCreateWorkers() throws Exception {
         client.createWorker(new CreateWorkerRequest("bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, "")).
+                workerState(new WorkerRunning(barSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -142,13 +143,13 @@ public void testAgentCreateWorkers() throws Exception {
         client.createWorker(new CreateWorkerRequest("baz", bazSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, "")).
+                workerState(new WorkerRunning(barSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("baz").
-                workerState(new WorkerRunning(bazSpec, 0, "")).
+                workerState(new WorkerRunning(bazSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -169,7 +170,7 @@ public void testAgentFinishesTasks() throws Exception {
         client.createWorker(new CreateWorkerRequest("foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -179,10 +180,10 @@ public void testAgentFinishesTasks() throws Exception {
         client.createWorker(new CreateWorkerRequest("bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, "")).
+                workerState(new WorkerRunning(barSpec, 1, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -190,10 +191,10 @@ public void testAgentFinishesTasks() throws Exception {
 
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 2, new 
TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, "")).
+                workerState(new WorkerRunning(barSpec, 1, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -201,10 +202,10 @@ public void testAgentFinishesTasks() throws Exception {
         client.stopWorker(new StopWorkerRequest("bar"));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 2, new 
TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 1, 7, "", "")).
+                workerState(new WorkerDone(barSpec, 1, 7, new 
TextNode("done"), "")).
                 build()).
             waitFor(client);
 
@@ -221,34 +222,40 @@ public void testWorkerCompletions() throws Exception {
             maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
-        SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, "");
+        SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
+            Collections.singletonMap("node01", 1L), "");
         client.createWorker(new CreateWorkerRequest("foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
-        SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, 2, "baz");
+        SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
+            Collections.singletonMap("node01", 2L), "baz");
         client.createWorker(new CreateWorkerRequest("bar", barSpec));
 
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 1,
+                    new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, "")).
+                workerState(new WorkerRunning(barSpec, 0,
+                    new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 1,
+                    new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 0, 2, "", "baz")).
+                workerState(new WorkerDone(barSpec, 0, 2,
+                    new TextNode("halted"), "baz")).
                 build()).
             waitFor(client);
     }
@@ -289,7 +296,7 @@ public void testKiboshFaults() throws Exception {
             client.createWorker(new CreateWorkerRequest("foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, "")).
+                    workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("Added fault foo"))).
                     build()).
                 waitFor(client);
             Assert.assertEquals(new 
KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
@@ -299,9 +306,9 @@ public void testKiboshFaults() throws Exception {
             client.createWorker(new CreateWorkerRequest("bar", barSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, "")).build()).
+                    workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("Added fault foo"))).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, "")).build()).
+                    workerState(new WorkerRunning(barSpec, 0, new 
TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(new 
ArrayList<Kibosh.KiboshFaultSpec>() {{
                     add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
@@ -311,9 +318,9 @@ public void testKiboshFaults() throws Exception {
             client.stopWorker(new StopWorkerRequest("foo"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerDone(fooSpec, 0, 1, "", 
"")).build()).
+                    workerState(new WorkerDone(fooSpec, 0, 1, new 
TextNode("Removed fault foo"), "")).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, "")).build()).
+                    workerState(new WorkerRunning(barSpec, 0, new 
TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new 
KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/bar", 456))), 
mockKibosh.read());
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 76b206bf135..8101d9c6e4e 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -52,7 +52,7 @@ public void testDeserializationDoesNotProduceNulls() throws 
Exception {
             0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 
3));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, 
null,
             0, null, null, 0));
-        verify(new SampleTaskSpec(0, 0, 0, null));
+        verify(new SampleTaskSpec(0, 0, null, null));
     }
 
     private <T> void verify(T val1) throws Exception {
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 004702f286f..34d7ffe6106 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Scheduler;
@@ -41,6 +44,7 @@
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
+import org.apache.kafka.trogdor.task.SampleTaskSpec;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
@@ -49,6 +53,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -94,8 +99,8 @@ public void testCreateTask() throws Exception {
             time.sleep(2);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 2)).
-                    workerState(new WorkerRunning(fooSpec, 2, "")).
+                    taskState(new TaskRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
                     build()).
                 waitFor(cluster.coordinatorClient()).
                 waitFor(cluster.agentClient("node02"));
@@ -103,7 +108,7 @@ public void testCreateTask() throws Exception {
             time.sleep(3);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 2, 5, "", false)).
+                    taskState(new TaskDone(fooSpec, 2, 5, "", false, new 
TextNode("done"))).
                     build()).
                 waitFor(cluster.coordinatorClient());
         }
@@ -131,26 +136,34 @@ public void testTaskDistribution() throws Exception {
             NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
             coordinatorClient.createTask(new CreateTaskRequest("foo", 
fooSpec));
             new ExpectedTasks().
-                addTask(new ExpectedTaskBuilder("foo").taskState(new 
TaskPending(fooSpec)).build()).
+                addTask(new ExpectedTaskBuilder("foo").taskState(
+                    new TaskPending(fooSpec)).build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
             time.sleep(11);
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node01", new TextNode("active"));
+            status1.set("node02", new TextNode("active"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 11)).
-                    workerState(new WorkerRunning(fooSpec, 11, "")).
+                    taskState(new TaskRunning(fooSpec, 11, status1)).
+                    workerState(new WorkerRunning(fooSpec, 11,  new 
TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
             time.sleep(2);
+            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+            status2.set("node01", new TextNode("done"));
+            status2.set("node02", new TextNode("done"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 11, 13, "", false)).
-                    workerState(new WorkerDone(fooSpec, 11, 13, "", "")).
+                    taskState(new TaskDone(fooSpec, 11, 13,
+                        "", false, status2)).
+                    workerState(new WorkerDone(fooSpec, 11, 13, new 
TextNode("done"), "")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -186,21 +199,29 @@ public void testTaskCancellation() throws Exception {
                 waitFor(agentClient2);
 
             time.sleep(11);
+
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node01", new TextNode("active"));
+            status1.set("node02", new TextNode("active"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 11)).
-                    workerState(new WorkerRunning(fooSpec, 11, "")).
+                    taskState(new TaskRunning(fooSpec, 11, status1)).
+                    workerState(new WorkerRunning(fooSpec, 11, new 
TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
+            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+            status2.set("node01", new TextNode("done"));
+            status2.set("node02", new TextNode("done"));
             time.sleep(1);
             coordinatorClient.stopTask(new StopTaskRequest("foo"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 11, 12, "", true)).
-                    workerState(new WorkerDone(fooSpec, 11, 12, "", "")).
+                    taskState(new TaskDone(fooSpec, 11, 12, "",
+                        true, status2)).
+                    workerState(new WorkerDone(fooSpec, 11, 12, new 
TextNode("done"), "")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -375,8 +396,8 @@ public void testTasksRequest() throws Exception {
             time.sleep(2);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 2)).
-                    workerState(new WorkerRunning(fooSpec, 2, "")).
+                    taskState(new TaskRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
                     build()).
                 addTask(new ExpectedTaskBuilder("bar").
                     taskState(new TaskPending(barSpec)).
@@ -394,4 +415,73 @@ public void testTasksRequest() throws Exception {
                 new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
         }
     }
+
+    @Test
+    public void testWorkersExitingAtDifferentTimes() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            addAgent("node03").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            new ExpectedTasks().waitFor(coordinatorClient);
+
+            HashMap<String, Long> nodeToExitMs = new HashMap<>();
+            nodeToExitMs.put("node02", 10L);
+            nodeToExitMs.put("node03", 20L);
+            SampleTaskSpec fooSpec =
+                new SampleTaskSpec(2, 100, nodeToExitMs, "");
+            coordinatorClient.createTask(new CreateTaskRequest("foo", 
fooSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskPending(fooSpec)).
+                    build()).
+                waitFor(coordinatorClient);
+
+            time.sleep(2);
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node02", new TextNode("active"));
+            status1.set("node03", new TextNode("active"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, status1)).
+                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02")).
+                waitFor(cluster.agentClient("node03"));
+
+            time.sleep(10);
+            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+            status2.set("node02", new TextNode("halted"));
+            status2.set("node03", new TextNode("active"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, status2)).
+                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node03"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, status2)).
+                    workerState(new WorkerDone(fooSpec, 2, 12, new 
TextNode("halted"), "")).
+                    build()).
+                waitFor(cluster.agentClient("node02"));
+
+            time.sleep(10);
+            ObjectNode status3 = new ObjectNode(JsonNodeFactory.instance);
+            status3.set("node02", new TextNode("halted"));
+            status3.set("node03", new TextNode("halted"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 2, 22, "",
+                        false, status3)).
+                    build()).
+                waitFor(coordinatorClient);
+        }
+    }
 };
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java 
b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
index 26fdfb273ec..38a160fba21 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
@@ -20,23 +20,28 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 public class SampleTaskSpec extends TaskSpec {
-    private final long exitMs;
+    private final Map<String, Long> nodeToExitMs;
     private final String error;
 
     @JsonCreator
     public SampleTaskSpec(@JsonProperty("startMs") long startMs,
                         @JsonProperty("durationMs") long durationMs,
-                        @JsonProperty("exitMs") long exitMs,
+                        @JsonProperty("nodeToExitMs") Map<String, Long> 
nodeToExitMs,
                         @JsonProperty("error") String error) {
         super(startMs, durationMs);
-        this.exitMs = exitMs;
+        this.nodeToExitMs = nodeToExitMs == null ? new HashMap<String, Long>() 
:
+            Collections.unmodifiableMap(nodeToExitMs);
         this.error = error == null ? "" : error;
     }
 
     @JsonProperty
-    public long exitMs() {
-        return exitMs;
+    public Map<String, Long> nodeToExitMs() {
+        return nodeToExitMs;
     }
 
     @JsonProperty
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java 
b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
index ebac27e466e..ade055d3d06 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.trogdor.task;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -26,12 +27,12 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class SampleTaskWorker implements TaskWorker {
     private final SampleTaskSpec spec;
     private final ScheduledExecutorService executor;
     private Future<Void> future;
+    private WorkerStatusTracker status;
 
     SampleTaskWorker(SampleTaskSpec spec) {
         this.spec = spec;
@@ -41,17 +42,24 @@
     }
 
     @Override
-    public synchronized void start(Platform platform, AtomicReference<String> 
status,
+    public synchronized void start(Platform platform, WorkerStatusTracker 
status,
                       final KafkaFutureImpl<String> haltFuture) throws 
Exception {
         if (this.future != null)
             return;
+        this.status = status;
+        this.status.update(new TextNode("active"));
+
+        Long exitMs = spec.nodeToExitMs().get(platform.curNode().name());
+        if (exitMs == null) {
+            exitMs = Long.MAX_VALUE;
+        }
         this.future = platform.scheduler().schedule(executor, new 
Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 haltFuture.complete(spec.error());
                 return null;
             }
-        }, spec.exitMs());
+        }, exitMs);
     }
 
     @Override
@@ -59,5 +67,6 @@ public void stop(Platform platform) throws Exception {
         this.future.cancel(false);
         this.executor.shutdown();
         this.executor.awaitTermination(1, TimeUnit.DAYS);
+        this.status.update(new TextNode("halted"));
     }
 };
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
index abd7e6250ff..d8d4ca9d5fb 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
@@ -40,11 +40,11 @@ public void testTaskSpecSerialization() throws Exception {
         } catch (InvalidTypeIdException e) {
         }
         String inputJson = 
"{\"class\":\"org.apache.kafka.trogdor.task.SampleTaskSpec\"," +
-            
"\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}";
+            
"\"startMs\":123,\"durationMs\":456,\"nodeToExitMs\":{\"node01\":1000},\"error\":\"foo\"}";
         SampleTaskSpec spec = JsonUtil.JSON_SERDE.readValue(inputJson, 
SampleTaskSpec.class);
         assertEquals(123, spec.startMs());
         assertEquals(456, spec.durationMs());
-        assertEquals(1000, spec.exitMs());
+        assertEquals(Long.valueOf(1000), spec.nodeToExitMs().get("node01"));
         assertEquals("foo", spec.error());
         String outputJson = JsonUtil.toJsonString(spec);
         assertEquals(inputJson, outputJson);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The Trogdor coordinator should track task statuses
> --------------------------------------------------
>
>                 Key: KAFKA-6688
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6688
>             Project: Kafka
>          Issue Type: Improvement
>          Components: system tests
>            Reporter: Colin P. McCabe
>            Assignee: Colin P. McCabe
>            Priority: Major
>
> The Trogdor coordinator should track task statuses



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to