This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 40183e3 KAFKA-6688. The Trogdor coordinator should track task statuses (#4737) 40183e3 is described below commit 40183e31567795d4d0f2b836294bc5d5fac2a56b Author: Colin Patrick McCabe <co...@cmccabe.xyz> AuthorDate: Sun Apr 8 01:35:33 2018 -0700 KAFKA-6688. The Trogdor coordinator should track task statuses (#4737) Reviewers: Anna Povzner <a...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com> --- .../apache/kafka/trogdor/agent/WorkerManager.java | 10 +- .../kafka/trogdor/coordinator/NodeManager.java | 47 +++---- .../kafka/trogdor/coordinator/TaskManager.java | 147 +++++++++++++-------- .../kafka/trogdor/fault/KiboshFaultWorker.java | 13 +- .../trogdor/fault/NetworkPartitionFaultWorker.java | 12 +- .../trogdor/fault/ProcessStopFaultWorker.java | 12 +- .../org/apache/kafka/trogdor/rest/TaskDone.java | 7 +- .../org/apache/kafka/trogdor/rest/TaskPending.java | 3 +- .../org/apache/kafka/trogdor/rest/TaskRunning.java | 6 +- .../org/apache/kafka/trogdor/rest/TaskState.java | 12 +- .../apache/kafka/trogdor/rest/TaskStopping.java | 6 +- .../org/apache/kafka/trogdor/rest/WorkerDone.java | 10 +- .../apache/kafka/trogdor/rest/WorkerReceiving.java | 7 + .../apache/kafka/trogdor/rest/WorkerRunning.java | 10 +- .../apache/kafka/trogdor/rest/WorkerStarting.java | 7 + .../org/apache/kafka/trogdor/rest/WorkerState.java | 5 +- .../apache/kafka/trogdor/rest/WorkerStopping.java | 10 +- .../AgentWorkerStatusTracker.java} | 29 ++-- .../apache/kafka/trogdor/task/NoOpTaskWorker.java | 10 +- .../org/apache/kafka/trogdor/task/TaskWorker.java | 6 +- .../WorkerStatusTracker.java} | 20 +-- .../kafka/trogdor/workload/ProduceBenchWorker.java | 52 +++++--- .../kafka/trogdor/workload/RoundTripWorker.java | 4 +- .../org/apache/kafka/trogdor/agent/AgentTest.java | 57 ++++---- .../trogdor/common/JsonSerializationTest.java | 2 +- .../kafka/trogdor/coordinator/CoordinatorTest.java | 118 +++++++++++++++-- .../apache/kafka/trogdor/task/SampleTaskSpec.java | 15 ++- .../kafka/trogdor/task/SampleTaskWorker.java | 15 ++- .../apache/kafka/trogdor/task/TaskSpecTest.java | 4 +- 29 files changed, 441 insertions(+), 215 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index cda7773..7c8de6d 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Scheduler; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.WorkerDone; @@ -29,6 +30,7 @@ import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; import org.apache.kafka.trogdor.rest.WorkerStopping; import org.apache.kafka.trogdor.rest.WorkerState; +import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker; import org.apache.kafka.trogdor.task.TaskSpec; import org.apache.kafka.trogdor.task.TaskWorker; import org.slf4j.Logger; @@ -43,7 +45,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public final class WorkerManager { private static final Logger log = LoggerFactory.getLogger(WorkerManager.class); @@ -190,7 +191,7 @@ public final class WorkerManager { /** * The worker status. */ - private final AtomicReference<String> status = new AtomicReference<>(""); + private final AgentWorkerStatusTracker status = new AgentWorkerStatusTracker(); /** * The time when this task was started. @@ -293,6 +294,8 @@ public final class WorkerManager { haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() { @Override public Void apply(String errorString) { + if (errorString == null) + errorString = ""; if (errorString.isEmpty()) { log.info("{}: Worker {} is halting.", nodeName, id); } else { @@ -306,8 +309,9 @@ public final class WorkerManager { try { worker.taskWorker.start(platform, worker.status, haltFuture); } catch (Exception e) { + log.info("{}: Worker {} start() exception", nodeName, id, e); stateChangeExecutor.submit(new HandleWorkerHalting(worker, - "worker.start() exception: " + e.getMessage(), true)); + "worker.start() exception: " + Utils.stackTrace(e), true)); } stateChangeExecutor.submit(new FinishCreatingWorker(worker)); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 0129007..91ef9c2 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -49,7 +49,6 @@ import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; -import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; @@ -192,6 +191,9 @@ public final class NodeManager { // agents going down? return; } + if (log.isTraceEnabled()) { + log.trace("{}: got heartbeat status {}", node.name(), agentStatus); + } // Identify workers which we think should be running, but which do not appear // in the agent's response. We need to send startWorker requests for these. for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) { @@ -203,40 +205,31 @@ public final class NodeManager { } } } - // Identify tasks which are running, but which we don't know about. - // Add these to the NodeManager as tasks that should not be running. for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) { String id = entry.getKey(); WorkerState state = entry.getValue(); - if (!workers.containsKey(id)) { + ManagedWorker worker = workers.get(id); + if (worker == null) { + // Identify tasks which are running, but which we don't know about. + // Add these to the NodeManager as tasks that should not be running. log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id); workers.put(id, new ManagedWorker(id, state.spec(), false, state)); - } - } - // Handle workers which need to be stopped. Handle workers which have newly completed. - for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) { - String id = entry.getKey(); - WorkerState state = entry.getValue(); - ManagedWorker worker = workers.get(id); - if (state instanceof WorkerStarting || state instanceof WorkerRunning) { - if (!worker.shouldRun) { - worker.tryStop(); - } - } else if (state instanceof WorkerDone) { - if (!(worker.state instanceof WorkerDone)) { - WorkerDone workerDoneState = (WorkerDone) state; - String error = workerDoneState.error(); - if (error.isEmpty()) { - log.info("{}: Worker {} finished with status '{}'", - node.name(), id, workerDoneState.status()); - } else { - log.warn("{}: Worker {} finished with error '{}' and status '{}'", - node.name(), id, error, workerDoneState.status()); + } else { + // Handle workers which need to be stopped. + if (state instanceof WorkerStarting || state instanceof WorkerRunning) { + if (!worker.shouldRun) { + worker.tryStop(); } - taskManager.handleWorkerCompletion(node.name(), worker.id, error); + } + // Notify the TaskManager if the worker state has changed. + if (worker.state.equals(state)) { + log.debug("{}: worker state is still {}", node.name(), worker.state); + } else { + log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); + worker.state = state; + taskManager.updateWorkerState(node.name(), worker.id, state); } } - worker.state = state; } } catch (Throwable e) { log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index d88e1d5..7e19c8b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -17,10 +17,14 @@ package org.apache.kafka.trogdor.coordinator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.Scheduler; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; @@ -31,13 +35,15 @@ import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksResponse; +import org.apache.kafka.trogdor.rest.WorkerDone; +import org.apache.kafka.trogdor.rest.WorkerReceiving; +import org.apache.kafka.trogdor.rest.WorkerState; import org.apache.kafka.trogdor.task.TaskController; import org.apache.kafka.trogdor.task.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -172,16 +178,9 @@ public final class TaskManager { private Future<?> startFuture = null; /** - * The name of the worker nodes involved with this task. - * Null if the task is not running. + * The states of the workers involved with this task. */ - private Set<String> workers = null; - - /** - * The names of the worker nodes which are still running this task. - * Null if the task is not running. - */ - private Set<String> activeWorkers = null; + public Map<String, WorkerState> workerStates = new TreeMap<>(); /** * If this is non-empty, a message describing how this task failed. @@ -241,14 +240,39 @@ public final class TaskManager { case PENDING: return new TaskPending(spec); case RUNNING: - return new TaskRunning(spec, startedMs); + return new TaskRunning(spec, startedMs, getCombinedStatus(workerStates)); case STOPPING: - return new TaskStopping(spec, startedMs); + return new TaskStopping(spec, startedMs, getCombinedStatus(workerStates)); case DONE: - return new TaskDone(spec, startedMs, doneMs, error, cancelled); + return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus(workerStates)); } throw new RuntimeException("unreachable"); } + + TreeSet<String> activeWorkers() { + TreeSet<String> workerNames = new TreeSet<>(); + for (Map.Entry<String, WorkerState> entry : workerStates.entrySet()) { + if (!entry.getValue().done()) { + workerNames.add(entry.getKey()); + } + } + return workerNames; + } + } + + private static final JsonNode getCombinedStatus(Map<String, WorkerState> states) { + if (states.size() == 1) { + return states.values().iterator().next().status(); + } else { + ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance); + for (Map.Entry<String, WorkerState> entry : states.entrySet()) { + JsonNode node = entry.getValue().status(); + if (node != null) { + objectNode.set(entry.getKey(), node); + } + } + return objectNode; + } } /** @@ -349,10 +373,8 @@ public final class TaskManager { log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", ")); task.state = ManagedTaskState.RUNNING; task.startedMs = time.milliseconds(); - task.workers = nodeNames; - task.activeWorkers = new HashSet<>(); - for (String workerName : task.workers) { - task.activeWorkers.add(workerName); + for (String workerName : nodeNames) { + task.workerStates.put(workerName, new WorkerReceiving(task.spec)); nodeManagers.get(workerName).createWorker(task.id, task.spec); } return null; @@ -398,15 +420,16 @@ public final class TaskManager { break; case RUNNING: task.cancelled = true; - if (task.activeWorkers.size() == 0) { + TreeSet<String> activeWorkers = task.activeWorkers(); + if (activeWorkers.isEmpty()) { log.info("Task {} is now complete with error: {}", id, task.error); task.doneMs = time.milliseconds(); task.state = ManagedTaskState.DONE; } else { - for (String workerName : task.activeWorkers) { + for (String workerName : activeWorkers) { nodeManagers.get(workerName).stopWorker(id); } - log.info("Cancelling task {} on worker(s): {}", id, Utils.join(task.activeWorkers, ", ")); + log.info("Cancelling task {} on worker(s): {}", id, Utils.join(activeWorkers, ", ")); task.state = ManagedTaskState.STOPPING; } break; @@ -422,66 +445,80 @@ public final class TaskManager { } /** - * A callback NodeManager makes to indicate that a worker has completed. - * The task will transition to DONE once all workers are done. + * Update the state of a particular agent's worker. * - * @param nodeName The node name. + * @param nodeName The node where the agent is running. * @param id The worker name. - * @param error An empty string if there is no error, or an error string. + * @param state The worker state. */ - public void handleWorkerCompletion(String nodeName, String id, String error) { - executor.submit(new HandleWorkerCompletion(nodeName, id, error)); + public void updateWorkerState(String nodeName, String id, WorkerState state) { + executor.submit(new UpdateWorkerState(nodeName, id, state)); } - class HandleWorkerCompletion implements Callable<Void> { + class UpdateWorkerState implements Callable<Void> { private final String nodeName; private final String id; - private final String error; + private final WorkerState state; - HandleWorkerCompletion(String nodeName, String id, String error) { + UpdateWorkerState(String nodeName, String id, WorkerState state) { this.nodeName = nodeName; this.id = id; - this.error = error; + this.state = state; } @Override public Void call() throws Exception { ManagedTask task = tasks.get(id); if (task == null) { - log.error("Can't handle completion of unknown worker {} on node {}", + log.error("Can't update worker state unknown worker {} on node {}", id, nodeName); return null; } - if ((task.state == ManagedTaskState.PENDING) || (task.state == ManagedTaskState.DONE)) { - log.error("Task {} got unexpected worker completion from {} while " + - "in {} state.", id, nodeName, task.state); - return null; - } - boolean broadcastStop = false; - if (task.state == ManagedTaskState.RUNNING) { - task.state = ManagedTaskState.STOPPING; - broadcastStop = true; - } - task.maybeSetError(error); - task.activeWorkers.remove(nodeName); - if (task.activeWorkers.size() == 0) { - task.doneMs = time.milliseconds(); - task.state = ManagedTaskState.DONE; - log.info("Task {} is now complete on {} with error: {}", id, - Utils.join(task.workers, ", "), - task.error.isEmpty() ? "(none)" : task.error); - } else if (broadcastStop) { - log.info("Node {} stopped. Stopping task {} on worker(s): {}", - id, Utils.join(task.activeWorkers, ", ")); - for (String workerName : task.activeWorkers) { - nodeManagers.get(workerName).stopWorker(id); - } + WorkerState prevState = task.workerStates.get(nodeName); + log.debug("Task {}: Updating worker state for {} from {} to {}.", + id, nodeName, prevState, state); + task.workerStates.put(nodeName, state); + if (state.done() && (!prevState.done())) { + handleWorkerCompletion(task, nodeName, (WorkerDone) state); } return null; } } /** + * Handle a worker being completed. + * + * @param task The task that owns the worker. + * @param nodeName The name of the node on which the worker is running. + * @param state The worker state. + */ + private void handleWorkerCompletion(ManagedTask task, String nodeName, WorkerDone state) { + if (state.error().isEmpty()) { + log.info("{}: Worker {} finished with status '{}'", + nodeName, task.id, JsonUtil.toJsonString(state.status())); + } else { + log.warn("{}: Worker {} finished with error '{}' and status '{}'", + nodeName, task.id, state.error(), JsonUtil.toJsonString(state.status())); + task.maybeSetError(state.error()); + } + if (task.activeWorkers().isEmpty()) { + task.doneMs = time.milliseconds(); + task.state = ManagedTaskState.DONE; + log.info("{}: Task {} is now complete on {} with error: {}", + nodeName, task.id, Utils.join(task.workerStates.keySet(), ", "), + task.error.isEmpty() ? "(none)" : task.error); + } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) { + TreeSet<String> activeWorkers = task.activeWorkers(); + log.info("{}: task {} stopped with error {}. Stopping worker(s): {}", + nodeName, task.id, task.error, Utils.join(activeWorkers, ", ")); + task.state = ManagedTaskState.STOPPING; + for (String workerName : activeWorkers) { + nodeManagers.get(workerName).stopWorker(task.id); + } + } + } + + /** * Get information about the tasks being managed. */ public TasksResponse tasks(TasksRequest request) throws ExecutionException, InterruptedException { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java index 629d15e..97934a8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java @@ -17,15 +17,15 @@ package org.apache.kafka.trogdor.fault; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec; import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; - public class KiboshFaultWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class); @@ -35,6 +35,8 @@ public class KiboshFaultWorker implements TaskWorker { private final String mountPath; + private WorkerStatusTracker status; + public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) { this.id = id; this.spec = spec; @@ -42,15 +44,20 @@ public class KiboshFaultWorker implements TaskWorker { } @Override - public void start(Platform platform, AtomicReference<String> status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> errorFuture) throws Exception { log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec); + this.status = status; + this.status.update(new TextNode("Adding fault " + id)); Kibosh.INSTANCE.addFault(mountPath, spec); + this.status.update(new TextNode("Added fault " + id)); } @Override public void stop(Platform platform) throws Exception { log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec); + this.status.update(new TextNode("Removing fault " + id)); Kibosh.INSTANCE.removeFault(mountPath, spec); + this.status.update(new TextNode("Removed fault " + id)); } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java index 787c5e0..1b99a93 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java @@ -17,11 +17,13 @@ package org.apache.kafka.trogdor.fault; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Topology; import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +31,6 @@ import java.net.InetAddress; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicReference; public class NetworkPartitionFaultWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class); @@ -38,22 +39,29 @@ public class NetworkPartitionFaultWorker implements TaskWorker { private final List<Set<String>> partitionSets; + private WorkerStatusTracker status; + public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) { this.id = id; this.partitionSets = partitionSets; } @Override - public void start(Platform platform, AtomicReference<String> status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> errorFuture) throws Exception { log.info("Activating NetworkPartitionFault {}.", id); + this.status = status; + this.status.update(new TextNode("creating network partition " + id)); runIptablesCommands(platform, "-A"); + this.status.update(new TextNode("created network partition " + id)); } @Override public void stop(Platform platform) throws Exception { log.info("Deactivating NetworkPartitionFault {}.", id); + this.status.update(new TextNode("removing network partition " + id)); runIptablesCommands(platform, "-D"); + this.status.update(new TextNode("removed network partition " + id)); } private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java index 66a8c6e..d30eaf7 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java @@ -17,16 +17,17 @@ package org.apache.kafka.trogdor.fault; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; public class ProcessStopFaultWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class); @@ -35,22 +36,29 @@ public class ProcessStopFaultWorker implements TaskWorker { private final String javaProcessName; + private WorkerStatusTracker status; + public ProcessStopFaultWorker(String id, String javaProcessName) { this.id = id; this.javaProcessName = javaProcessName; } @Override - public void start(Platform platform, AtomicReference<String> status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> errorFuture) throws Exception { + this.status = status; log.info("Activating ProcessStopFault {}.", id); + this.status.update(new TextNode("stopping " + javaProcessName)); sendSignals(platform, "SIGSTOP"); + this.status.update(new TextNode("stopped " + javaProcessName)); } @Override public void stop(Platform platform) throws Exception { log.info("Deactivating ProcessStopFault {}.", id); + this.status.update(new TextNode("resuming " + javaProcessName)); sendSignals(platform, "SIGCONT"); + this.status.update(new TextNode("resumed " + javaProcessName)); } private void sendSignals(Platform platform, String signalName) throws Exception { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java index 536d3f2..e8d6003 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.trogdor.rest; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -50,8 +50,9 @@ public class TaskDone extends TaskState { @JsonProperty("startedMs") long startedMs, @JsonProperty("doneMs") long doneMs, @JsonProperty("error") String error, - @JsonProperty("cancelled") boolean cancelled) { - super(spec); + @JsonProperty("cancelled") boolean cancelled, + @JsonProperty("status") JsonNode status) { + super(spec, status); this.startedMs = startedMs; this.doneMs = doneMs; this.error = error; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java index b0162d3..7831301 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java @@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -27,6 +28,6 @@ import org.apache.kafka.trogdor.task.TaskSpec; public class TaskPending extends TaskState { @JsonCreator public TaskPending(@JsonProperty("spec") TaskSpec spec) { - super(spec); + super(spec, NullNode.instance); } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java index bff3676..7a81bdf 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java @@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -32,8 +33,9 @@ public class TaskRunning extends TaskState { @JsonCreator public TaskRunning(@JsonProperty("spec") TaskSpec spec, - @JsonProperty("startedMs") long startedMs) { - super(spec); + @JsonProperty("startedMs") long startedMs, + @JsonProperty("status") JsonNode status) { + super(spec, status); this.startedMs = startedMs; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java index 28b6108..0764e14 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java @@ -20,6 +20,8 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -37,12 +39,20 @@ import org.apache.kafka.trogdor.task.TaskSpec; public abstract class TaskState extends Message { private final TaskSpec spec; - public TaskState(TaskSpec spec) { + private final JsonNode status; + + public TaskState(TaskSpec spec, JsonNode status) { this.spec = spec; + this.status = status == null ? NullNode.instance : status; } @JsonProperty public TaskSpec spec() { return spec; } + + @JsonProperty + public JsonNode status() { + return status; + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java index 4446b75..d40b43c 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java @@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -32,8 +33,9 @@ public class TaskStopping extends TaskState { @JsonCreator public TaskStopping(@JsonProperty("spec") TaskSpec spec, - @JsonProperty("startedMs") long startedMs) { - super(spec); + @JsonProperty("startedMs") long startedMs, + @JsonProperty("status") JsonNode status) { + super(spec, status); this.startedMs = startedMs; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java index e463ffc..500d3c6 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java @@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -39,7 +41,7 @@ public class WorkerDone extends WorkerState { * The task status. The format will depend on the type of task that is * being run. */ - private final String status; + private final JsonNode status; /** * Empty if the task completed without error; the error message otherwise. @@ -50,12 +52,12 @@ public class WorkerDone extends WorkerState { public WorkerDone(@JsonProperty("spec") TaskSpec spec, @JsonProperty("startedMs") long startedMs, @JsonProperty("doneMs") long doneMs, - @JsonProperty("status") String status, + @JsonProperty("status") JsonNode status, @JsonProperty("error") String error) { super(spec); this.startedMs = startedMs; this.doneMs = doneMs; - this.status = status == null ? "" : status; + this.status = status == null ? NullNode.instance : status; this.error = error == null ? "" : error; } @@ -72,7 +74,7 @@ public class WorkerDone extends WorkerState { @JsonProperty @Override - public String status() { + public JsonNode status() { return status; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java index d3e3565..7068774 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java @@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -30,4 +32,9 @@ public final class WorkerReceiving extends WorkerState { public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) { super(spec); } + + @Override + public JsonNode status() { + return new TextNode("receiving"); + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java index e3b8d19..af8ee88 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java @@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -34,15 +36,15 @@ public class WorkerRunning extends WorkerState { * The task status. The format will depend on the type of task that is * being run. */ - private final String status; + private final JsonNode status; @JsonCreator public WorkerRunning(@JsonProperty("spec") TaskSpec spec, @JsonProperty("startedMs") long startedMs, - @JsonProperty("status") String status) { + @JsonProperty("status") JsonNode status) { super(spec); this.startedMs = startedMs; - this.status = status == null ? "" : status; + this.status = status == null ? NullNode.instance : status; } @JsonProperty @@ -53,7 +55,7 @@ public class WorkerRunning extends WorkerState { @JsonProperty @Override - public String status() { + public JsonNode status() { return status; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java index 3a766ea..b568ec1 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java @@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -29,4 +31,9 @@ public final class WorkerStarting extends WorkerState { public WorkerStarting(@JsonProperty("spec") TaskSpec spec) { super(spec); } + + @Override + public JsonNode status() { + return new TextNode("starting"); + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java index 6d7c687..044d719 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java @@ -20,6 +20,7 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.KafkaException; import org.apache.kafka.trogdor.task.TaskSpec; @@ -60,9 +61,7 @@ public abstract class WorkerState extends Message { throw new KafkaException("invalid state"); } - public String status() { - throw new KafkaException("invalid state"); - } + public abstract JsonNode status(); public boolean running() { return false; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java index 777e511..9fbb3ff 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java @@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.kafka.trogdor.task.TaskSpec; /** @@ -34,15 +36,15 @@ public class WorkerStopping extends WorkerState { * The task status. The format will depend on the type of task that is * being run. */ - private final String status; + private final JsonNode status; @JsonCreator public WorkerStopping(@JsonProperty("spec") TaskSpec spec, @JsonProperty("startedMs") long startedMs, - @JsonProperty("status") String status) { + @JsonProperty("status") JsonNode status) { super(spec); this.startedMs = startedMs; - this.status = status == null ? "" : status; + this.status = status == null ? NullNode.instance : status; } @JsonProperty @@ -53,7 +55,7 @@ public class WorkerStopping extends WorkerState { @JsonProperty @Override - public String status() { + public JsonNode status() { return status; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java similarity index 57% copy from tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java copy to tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java index 3a766ea..2ad8e4e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java @@ -15,18 +15,29 @@ * limitations under the License. */ -package org.apache.kafka.trogdor.rest; +package org.apache.kafka.trogdor.task; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.task.TaskSpec; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; /** - * When we have just started a worker. + * Tracks the status of a Trogdor worker. */ -public final class WorkerStarting extends WorkerState { - @JsonCreator - public WorkerStarting(@JsonProperty("spec") TaskSpec spec) { - super(spec); +public class AgentWorkerStatusTracker implements WorkerStatusTracker { + private JsonNode status = NullNode.instance; + + @Override + public void update(JsonNode newStatus) { + JsonNode status = newStatus.deepCopy(); + synchronized (this) { + this.status = status; + } + } + + /** + * Retrieves the status. + */ + public synchronized JsonNode get() { + return status; } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java index dfa8084..77336d8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java @@ -17,30 +17,34 @@ package org.apache.kafka.trogdor.task; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.trogdor.common.Platform; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; - public class NoOpTaskWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(NoOpTaskWorker.class); private final String id; + private WorkerStatusTracker status; + public NoOpTaskWorker(String id) { this.id = id; } @Override - public void start(Platform platform, AtomicReference<String> status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> errorFuture) throws Exception { log.info("{}: Activating NoOpTask.", id); + this.status = status; + this.status.update(new TextNode("active")); } @Override public void stop(Platform platform) throws Exception { log.info("{}: Deactivating NoOpTask.", id); + this.status.update(new TextNode("done")); } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java index 288eb9c..042568f 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java @@ -20,8 +20,6 @@ package org.apache.kafka.trogdor.task; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.trogdor.common.Platform; -import java.util.concurrent.atomic.AtomicReference; - /** * The agent-side interface for implementing tasks. */ @@ -42,7 +40,7 @@ public interface TaskWorker { * * * @param platform The platform to use. - * @param status The current status string. The TaskWorker can update + * @param status The current status. The TaskWorker can update * this at any time to provide an updated status. * @param haltFuture A future which the worker should complete if it halts. * If it is completed with an empty string, that means the task @@ -53,7 +51,7 @@ public interface TaskWorker { * * @throws Exception If the TaskWorker failed to start. stop() will not be invoked. */ - void start(Platform platform, AtomicReference<String> status, KafkaFutureImpl<String> haltFuture) + void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> haltFuture) throws Exception; /** diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java similarity index 67% copy from tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java copy to tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java index b0162d3..dfbc7ea 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.kafka.trogdor.rest; +package org.apache.kafka.trogdor.task; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.task.TaskSpec; +import com.fasterxml.jackson.databind.JsonNode; /** - * The state for a task which is still pending. + * Tracks the status of a Trogdor worker. */ -public class TaskPending extends TaskState { - @JsonCreator - public TaskPending(@JsonProperty("spec") TaskSpec spec) { - super(spec); - } +public interface WorkerStatusTracker { + /** + * Updates the status. + * + * @param status The new status. + */ + void update(JsonNode status); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index a891b83..4c3095f 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; @@ -34,6 +35,7 @@ import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +48,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public class ProduceBenchWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class); @@ -61,7 +62,7 @@ public class ProduceBenchWorker implements TaskWorker { private ScheduledExecutorService executor; - private AtomicReference<String> status; + private WorkerStatusTracker status; private KafkaFutureImpl<String> doneFuture; @@ -81,7 +82,7 @@ public class ProduceBenchWorker implements TaskWorker { } @Override - public void start(Platform platform, AtomicReference<String> status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("ProducerBenchWorker is already running."); @@ -112,9 +113,10 @@ public class ProduceBenchWorker implements TaskWorker { newTopics.put(name, new NewTopic(name, spec.numPartitions(), spec.replicationFactor())); } + status.update(new TextNode("Creating " + spec.totalTopics() + " topic(s)")); WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(), spec.adminClientConf(), newTopics, false); - + status.update(new TextNode("Created " + spec.totalTopics() + " topic(s)")); executor.submit(new SendRecords()); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); @@ -181,7 +183,7 @@ public class ProduceBenchWorker implements TaskWorker { this.histogram = new Histogram(5000); int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.statusUpdaterFuture = executor.scheduleWithFixedDelay( - new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES); + new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); // add common client configs to producer properties, and then user-specified producer @@ -218,10 +220,10 @@ public class ProduceBenchWorker implements TaskWorker { WorkerUtils.abort(log, "SendRecords", e, doneFuture); } finally { statusUpdaterFuture.cancel(false); - new StatusUpdater(histogram).run(); + StatusData statusData = new StatusUpdater(histogram).update(); long curTimeMs = Time.SYSTEM.milliseconds(); log.info("Sent {} total record(s) in {} ms. status: {}", - histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get()); + histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData); } doneFuture.complete(""); return null; @@ -234,46 +236,54 @@ public class ProduceBenchWorker implements TaskWorker { public class StatusUpdater implements Runnable { private final Histogram histogram; - private final float[] percentiles; StatusUpdater(Histogram histogram) { this.histogram = histogram; - this.percentiles = new float[] {0.50f, 0.95f, 0.99f}; } @Override public void run() { try { - Histogram.Summary summary = histogram.summarize(percentiles); - StatusData statusData = new StatusData(summary.numSamples(), summary.average(), - summary.percentiles().get(0).value(), - summary.percentiles().get(1).value(), - summary.percentiles().get(2).value()); - String statusDataString = JsonUtil.toJsonString(statusData); - status.set(statusDataString); + update(); } catch (Exception e) { WorkerUtils.abort(log, "StatusUpdater", e, doneFuture); } } + + StatusData update() { + Histogram.Summary summary = histogram.summarize(StatusData.PERCENTILES); + StatusData statusData = new StatusData(summary.numSamples(), summary.average(), + summary.percentiles().get(0).value(), + summary.percentiles().get(1).value(), + summary.percentiles().get(2).value()); + status.update(JsonUtil.JSON_SERDE.valueToTree(statusData)); + return statusData; + } } public static class StatusData { private final long totalSent; private final float averageLatencyMs; private final int p50LatencyMs; - private final int p90LatencyMs; + private final int p95LatencyMs; private final int p99LatencyMs; + /** + * The percentiles to use when calculating the histogram data. + * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields. + */ + final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f}; + @JsonCreator StatusData(@JsonProperty("totalSent") long totalSent, @JsonProperty("averageLatencyMs") float averageLatencyMs, @JsonProperty("p50LatencyMs") int p50latencyMs, - @JsonProperty("p90LatencyMs") int p90latencyMs, + @JsonProperty("p95LatencyMs") int p95latencyMs, @JsonProperty("p99LatencyMs") int p99latencyMs) { this.totalSent = totalSent; this.averageLatencyMs = averageLatencyMs; this.p50LatencyMs = p50latencyMs; - this.p90LatencyMs = p90latencyMs; + this.p95LatencyMs = p95latencyMs; this.p99LatencyMs = p99latencyMs; } @@ -293,8 +303,8 @@ public class ProduceBenchWorker implements TaskWorker { } @JsonProperty - public int p90LatencyMs() { - return p90LatencyMs; + public int p95LatencyMs() { + return p95LatencyMs; } @JsonProperty diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 08b11ac..12b0c08 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -39,6 +39,7 @@ import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public class RoundTripWorker implements TaskWorker { private static final int THROTTLE_PERIOD_MS = 100; @@ -98,7 +98,7 @@ public class RoundTripWorker implements TaskWorker { } @Override - public void start(Platform platform, AtomicReference<String> status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("RoundTripWorker is already running."); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index 30d13b5..61de5c9 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.trogdor.agent; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.utils.MockScheduler; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Scheduler; @@ -122,7 +123,7 @@ public class AgentTest { CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec)); assertEquals(fooSpec.toString(), response.spec().toString()); new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))). build()). waitFor(client); @@ -131,10 +132,10 @@ public class AgentTest { client.createWorker(new CreateWorkerRequest("bar", barSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 0, "")). + workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))). build()). waitFor(client); @@ -142,13 +143,13 @@ public class AgentTest { client.createWorker(new CreateWorkerRequest("baz", bazSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 0, "")). + workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("baz"). - workerState(new WorkerRunning(bazSpec, 0, "")). + workerState(new WorkerRunning(bazSpec, 0, new TextNode("active"))). build()). waitFor(client); @@ -169,7 +170,7 @@ public class AgentTest { client.createWorker(new CreateWorkerRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))). build()). waitFor(client); @@ -179,10 +180,10 @@ public class AgentTest { client.createWorker(new CreateWorkerRequest("bar", barSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 1, "")). + workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))). build()). waitFor(client); @@ -190,10 +191,10 @@ public class AgentTest { new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone(fooSpec, 0, 2, "", "")). + workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 1, "")). + workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))). build()). waitFor(client); @@ -201,10 +202,10 @@ public class AgentTest { client.stopWorker(new StopWorkerRequest("bar")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone(fooSpec, 0, 2, "", "")). + workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerDone(barSpec, 1, 7, "", "")). + workerState(new WorkerDone(barSpec, 1, 7, new TextNode("done"), "")). build()). waitFor(client); @@ -221,34 +222,40 @@ public class AgentTest { maxTries(10).target("localhost", agent.port()).build(); new ExpectedTasks().waitFor(client); - SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, ""); + SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, + Collections.singletonMap("node01", 1L), ""); client.createWorker(new CreateWorkerRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))). build()). waitFor(client); - SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, 2, "baz"); + SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, + Collections.singletonMap("node01", 2L), "baz"); client.createWorker(new CreateWorkerRequest("bar", barSpec)); time.sleep(1); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone(fooSpec, 0, 1, "", "")). + workerState(new WorkerDone(fooSpec, 0, 1, + new TextNode("halted"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 0, "")). + workerState(new WorkerRunning(barSpec, 0, + new TextNode("active"))). build()). waitFor(client); time.sleep(1); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone(fooSpec, 0, 1, "", "")). + workerState(new WorkerDone(fooSpec, 0, 1, + new TextNode("halted"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerDone(barSpec, 0, 2, "", "baz")). + workerState(new WorkerDone(barSpec, 0, 2, + new TextNode("halted"), "baz")). build()). waitFor(client); } @@ -289,7 +296,7 @@ public class AgentTest { client.createWorker(new CreateWorkerRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))). build()). waitFor(client); Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList( @@ -299,9 +306,9 @@ public class AgentTest { client.createWorker(new CreateWorkerRequest("bar", barSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning(fooSpec, 0, "")).build()). + workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 0, "")).build()). + workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()). waitFor(client); Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{ add(new KiboshFilesUnreadableFaultSpec("/foo", 123)); @@ -311,9 +318,9 @@ public class AgentTest { client.stopWorker(new StopWorkerRequest("foo")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()). + workerState(new WorkerDone(fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning(barSpec, 0, "")).build()). + workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()). waitFor(client); Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList( new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read()); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index 76b206b..8101d9c 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -52,7 +52,7 @@ public class JsonSerializationTest { 0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3)); verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null, 0, null, null, 0)); - verify(new SampleTaskSpec(0, 0, 0, null)); + verify(new SampleTaskSpec(0, 0, null, null)); } private <T> void verify(T val1) throws Exception { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index 004702f..34d7ffe 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -17,6 +17,9 @@ package org.apache.kafka.trogdor.coordinator; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.utils.MockScheduler; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Scheduler; @@ -41,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; +import org.apache.kafka.trogdor.task.SampleTaskSpec; import org.junit.Rule; import org.junit.rules.Timeout; import org.slf4j.Logger; @@ -49,6 +53,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import static org.junit.Assert.assertEquals; @@ -94,8 +99,8 @@ public class CoordinatorTest { time.sleep(2); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskRunning(fooSpec, 2)). - workerState(new WorkerRunning(fooSpec, 2, "")). + taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))). + workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))). build()). waitFor(cluster.coordinatorClient()). waitFor(cluster.agentClient("node02")); @@ -103,7 +108,7 @@ public class CoordinatorTest { time.sleep(3); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 2, 5, "", false)). + taskState(new TaskDone(fooSpec, 2, 5, "", false, new TextNode("done"))). build()). waitFor(cluster.coordinatorClient()); } @@ -131,26 +136,34 @@ public class CoordinatorTest { NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); new ExpectedTasks(). - addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()). + addTask(new ExpectedTaskBuilder("foo").taskState( + new TaskPending(fooSpec)).build()). waitFor(coordinatorClient). waitFor(agentClient1). waitFor(agentClient2); time.sleep(11); + ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance); + status1.set("node01", new TextNode("active")); + status1.set("node02", new TextNode("active")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskRunning(fooSpec, 11)). - workerState(new WorkerRunning(fooSpec, 11, "")). + taskState(new TaskRunning(fooSpec, 11, status1)). + workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))). build()). waitFor(coordinatorClient). waitFor(agentClient1). waitFor(agentClient2); time.sleep(2); + ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); + status2.set("node01", new TextNode("done")); + status2.set("node02", new TextNode("done")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 11, 13, "", false)). - workerState(new WorkerDone(fooSpec, 11, 13, "", "")). + taskState(new TaskDone(fooSpec, 11, 13, + "", false, status2)). + workerState(new WorkerDone(fooSpec, 11, 13, new TextNode("done"), "")). build()). waitFor(coordinatorClient). waitFor(agentClient1). @@ -186,21 +199,29 @@ public class CoordinatorTest { waitFor(agentClient2); time.sleep(11); + + ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance); + status1.set("node01", new TextNode("active")); + status1.set("node02", new TextNode("active")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskRunning(fooSpec, 11)). - workerState(new WorkerRunning(fooSpec, 11, "")). + taskState(new TaskRunning(fooSpec, 11, status1)). + workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))). build()). waitFor(coordinatorClient). waitFor(agentClient1). waitFor(agentClient2); + ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); + status2.set("node01", new TextNode("done")); + status2.set("node02", new TextNode("done")); time.sleep(1); coordinatorClient.stopTask(new StopTaskRequest("foo")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 11, 12, "", true)). - workerState(new WorkerDone(fooSpec, 11, 12, "", "")). + taskState(new TaskDone(fooSpec, 11, 12, "", + true, status2)). + workerState(new WorkerDone(fooSpec, 11, 12, new TextNode("done"), "")). build()). waitFor(coordinatorClient). waitFor(agentClient1). @@ -375,8 +396,8 @@ public class CoordinatorTest { time.sleep(2); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskRunning(fooSpec, 2)). - workerState(new WorkerRunning(fooSpec, 2, "")). + taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))). + workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("bar"). taskState(new TaskPending(barSpec)). @@ -394,4 +415,73 @@ public class CoordinatorTest { new TasksRequest(null, 3, 0, 0, 0)).tasks().size()); } } + + @Test + public void testWorkersExitingAtDifferentTimes() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + addAgent("node03"). + scheduler(scheduler). + build()) { + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + new ExpectedTasks().waitFor(coordinatorClient); + + HashMap<String, Long> nodeToExitMs = new HashMap<>(); + nodeToExitMs.put("node02", 10L); + nodeToExitMs.put("node03", 20L); + SampleTaskSpec fooSpec = + new SampleTaskSpec(2, 100, nodeToExitMs, ""); + coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskPending(fooSpec)). + build()). + waitFor(coordinatorClient); + + time.sleep(2); + ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance); + status1.set("node02", new TextNode("active")); + status1.set("node03", new TextNode("active")); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskRunning(fooSpec, 2, status1)). + workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")). + waitFor(cluster.agentClient("node03")); + + time.sleep(10); + ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); + status2.set("node02", new TextNode("halted")); + status2.set("node03", new TextNode("active")); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskRunning(fooSpec, 2, status2)). + workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node03")); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskRunning(fooSpec, 2, status2)). + workerState(new WorkerDone(fooSpec, 2, 12, new TextNode("halted"), "")). + build()). + waitFor(cluster.agentClient("node02")); + + time.sleep(10); + ObjectNode status3 = new ObjectNode(JsonNodeFactory.instance); + status3.set("node02", new TextNode("halted")); + status3.set("node03", new TextNode("halted")); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskDone(fooSpec, 2, 22, "", + false, status3)). + build()). + waitFor(coordinatorClient); + } + } }; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java index 26fdfb2..38a160f 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java @@ -20,23 +20,28 @@ package org.apache.kafka.trogdor.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + public class SampleTaskSpec extends TaskSpec { - private final long exitMs; + private final Map<String, Long> nodeToExitMs; private final String error; @JsonCreator public SampleTaskSpec(@JsonProperty("startMs") long startMs, @JsonProperty("durationMs") long durationMs, - @JsonProperty("exitMs") long exitMs, + @JsonProperty("nodeToExitMs") Map<String, Long> nodeToExitMs, @JsonProperty("error") String error) { super(startMs, durationMs); - this.exitMs = exitMs; + this.nodeToExitMs = nodeToExitMs == null ? new HashMap<String, Long>() : + Collections.unmodifiableMap(nodeToExitMs); this.error = error == null ? "" : error; } @JsonProperty - public long exitMs() { - return exitMs; + public Map<String, Long> nodeToExitMs() { + return nodeToExitMs; } @JsonProperty diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java index ebac27e..ade055d 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java @@ -17,6 +17,7 @@ package org.apache.kafka.trogdor.task; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; @@ -26,12 +27,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; public class SampleTaskWorker implements TaskWorker { private final SampleTaskSpec spec; private final ScheduledExecutorService executor; private Future<Void> future; + private WorkerStatusTracker status; SampleTaskWorker(SampleTaskSpec spec) { this.spec = spec; @@ -41,17 +42,24 @@ public class SampleTaskWorker implements TaskWorker { } @Override - public synchronized void start(Platform platform, AtomicReference<String> status, + public synchronized void start(Platform platform, WorkerStatusTracker status, final KafkaFutureImpl<String> haltFuture) throws Exception { if (this.future != null) return; + this.status = status; + this.status.update(new TextNode("active")); + + Long exitMs = spec.nodeToExitMs().get(platform.curNode().name()); + if (exitMs == null) { + exitMs = Long.MAX_VALUE; + } this.future = platform.scheduler().schedule(executor, new Callable<Void>() { @Override public Void call() throws Exception { haltFuture.complete(spec.error()); return null; } - }, spec.exitMs()); + }, exitMs); } @Override @@ -59,5 +67,6 @@ public class SampleTaskWorker implements TaskWorker { this.future.cancel(false); this.executor.shutdown(); this.executor.awaitTermination(1, TimeUnit.DAYS); + this.status.update(new TextNode("halted")); } }; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java index abd7e62..d8d4ca9 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java @@ -40,11 +40,11 @@ public class TaskSpecTest { } catch (InvalidTypeIdException e) { } String inputJson = "{\"class\":\"org.apache.kafka.trogdor.task.SampleTaskSpec\"," + - "\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}"; + "\"startMs\":123,\"durationMs\":456,\"nodeToExitMs\":{\"node01\":1000},\"error\":\"foo\"}"; SampleTaskSpec spec = JsonUtil.JSON_SERDE.readValue(inputJson, SampleTaskSpec.class); assertEquals(123, spec.startMs()); assertEquals(456, spec.durationMs()); - assertEquals(1000, spec.exitMs()); + assertEquals(Long.valueOf(1000), spec.nodeToExitMs().get("node01")); assertEquals("foo", spec.error()); String outputJson = JsonUtil.toJsonString(spec); assertEquals(inputJson, outputJson); -- To stop receiving notification emails like this one, please contact rsiva...@apache.org.