This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4be68c5 KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219) 4be68c5 is described below commit 4be68c58da6bbd16cb5682dbf1c9b5b1c93500a7 Author: Colin Patrick McCabe <co...@cmccabe.xyz> AuthorDate: Wed Feb 6 16:42:02 2019 -0800 KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219) Allow the Trogdor agent to execute external commands. The agent communicates with the external commands via stdin, stdout, and stderr. Based on a patch by Xi Yang <x...@confluent.io> Reviewers: David Arthur <mum...@gmail.com> --- tests/bin/external_trogdor_command_example.py | 38 ++ tests/spec/external_command.json | 33 ++ .../trogdor/workload/ExternalCommandSpec.java | 115 ++++++ .../trogdor/workload/ExternalCommandWorker.java | 398 +++++++++++++++++++++ .../workload/ExternalCommandWorkerTest.java | 196 ++++++++++ 5 files changed, 780 insertions(+) diff --git a/tests/bin/external_trogdor_command_example.py b/tests/bin/external_trogdor_command_example.py new file mode 100755 index 0000000..0e53557 --- /dev/null +++ b/tests/bin/external_trogdor_command_example.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys +import time + +# +# This is an example of an external script which can be run through Trogdor's +# ExternalCommandWorker. +# + +if __name__ == '__main__': + # Read the ExternalCommandWorker start message. + line = sys.stdin.readline() + start_message = json.loads(line) + workload = start_message["workload"] + print("Starting external_trogdor_command_example with task id %s, workload %s" \ + % (start_message["id"], workload)) + sys.stdout.flush() + `print(json.dumps({"status": "running"}))` + sys.stdout.flush() + time.sleep(0.001 * workload["delayMs"]) + `print(json.dumps({"status": "exiting after %s delayMs" % workload["delayMs"]}))` + sys.stdout.flush() diff --git a/tests/spec/external_command.json b/tests/spec/external_command.json new file mode 100755 index 0000000..d432938 --- /dev/null +++ b/tests/spec/external_command.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// An example task specification for running an external command in Trogdor. +// Note that this task spec uses a relative path, so make sure you launch +// Trogdor from the project root directory when using it. +// See TROGDOR.md for details. +// + +{ + "class": "org.apache.kafka.trogdor.workload.ExternalCommandSpec", + "command": ["./tests/bin/external_trogdor_command_example.py"], + "durationMs": 10000000, + "commandNode": "node0", + "workload":{ + "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + "message": "Hello, world", + "delayMs": 2000 + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java new file mode 100644 index 0000000..4947aed --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +import com.fasterxml.jackson.databind.node.NullNode; +import org.apache.kafka.trogdor.task.TaskController; +import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * ExternalCommandSpec describes a task that executes Trogdor tasks with the command. + * + * An example uses the python runner to execute the ProduceBenchSpec task. + * + * #{@code + * { + * "class": "org.apache.kafka.trogdor.workload.ExternalCommandSpec", + * "command": ["python", "/path/to/trogdor/python/runner"], + * "durationMs": 10000000, + * "producerNode": "node0", + * "workload": { + * "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + * "bootstrapServers": "localhost:9092", + * "targetMessagesPerSec": 10, + * "maxMessages": 100, + * "activeTopics": { + * "foo[1-3]": { + * "numPartitions": 3, + * "replicationFactor": 1 + * } + * }, + * "inactiveTopics": { + * "foo[4-5]": { + * "numPartitions": 3, + * "replicationFactor": 1 + * } + * } + * } + * } + */ +public class ExternalCommandSpec extends TaskSpec { + private final String commandNode; + private final List<String> command; + private final JsonNode workload; + private final Optional<Integer> shutdownGracePeriodMs; + + @JsonCreator + public ExternalCommandSpec( + @JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs, + @JsonProperty("commandNode") String commandNode, + @JsonProperty("command") List<String> command, + @JsonProperty("workload") JsonNode workload, + @JsonProperty("shutdownGracePeriodMs") Optional<Integer> shutdownGracePeriodMs) { + super(startMs, durationMs); + this.commandNode = (commandNode == null) ? "" : commandNode; + this.command = (command == null) ? Collections.unmodifiableList(new ArrayList<String>()) : command; + this.workload = (workload == null) ? NullNode.instance : workload; + this.shutdownGracePeriodMs = shutdownGracePeriodMs; + } + + @JsonProperty + public String commandNode() { + return commandNode; + } + + @JsonProperty + public List<String> command() { + return command; + } + + @JsonProperty + public JsonNode workload() { + return workload; + } + + @JsonProperty + public Optional<Integer> shutdownGracePeriodMs() { + return shutdownGracePeriodMs; + } + + @Override + public TaskController newController(String id) { + return topology -> Collections.singleton(commandNode); + } + + @Override + public TaskWorker newTaskWorker(String id) { + return new ExternalCommandWorker(id, this); + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java new file mode 100644 index 0000000..6f5799f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.ThreadUtils; +import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; + +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Optional; + +/** + * ExternalCommandWorker starts an external process to run a Trogdor command. + * + * The worker communicates with the external process over the standard input and output streams. + * + * When the process is first launched, ExternalCommandWorker will send a message on standard + * input describing the task ID and the workload. This message will not contain line breaks. + * It will have this JSON format: + * {"id":<task ID string>, "workload":<configured workload JSON object>} + * + * ExternalCommandWorker will log anything that the process writes to stderr, but will take + * no other action with it. + * + * If the process sends a single-line JSON object to stdout, ExternalCommandWorker will parse it. + * The JSON object can contain the following fields: + * - status: If the object contains this field, the status will be set to the given value. + * - error: If the object contains this field, the error will be set to the given value. + * Once an error occurs, we will try to terminate the process. + * - log: If the object contains this field, a log message will be issued with this text. + * + * Note that standard output is buffered by default. The subprocess may wish + * to flush it after writing its status JSON. This will ensure that the status + * is seen in a timely fashion. + * + * If the process sends a non-JSON line to stdout, the worker will log it. + * + * If the process exits, ExternalCommandWorker will finish. If the process exits unsuccessfully, + * this is considered an error. If the worker needs to stop the process, it will start by sending + * a SIGTERM. If this does not have the required effect, it will send a SIGKILL, once the shutdown + * grace period has elapsed. + */ +public class ExternalCommandWorker implements TaskWorker { + private static final Logger log = LoggerFactory.getLogger(ExternalCommandWorker.class); + + private static final int DEFAULT_SHUTDOWN_GRACE_PERIOD_MS = 5000; + + /** + * True only if the worker is running. + */ + private final AtomicBoolean running = new AtomicBoolean(false); + + enum TerminatorAction { + DESTROY, + DESTROY_FORCIBLY, + CLOSE + } + + /** + * A queue used to communicate with the signal sender thread. + */ + private final LinkedBlockingQueue<TerminatorAction> terminatorActionQueue = new LinkedBlockingQueue<>(); + + /** + * The queue of objects to write to the process stdin. + */ + private final LinkedBlockingQueue<Optional<JsonNode>> stdinQueue = new LinkedBlockingQueue<>(); + + /** + * The task ID. + */ + private final String id; + + /** + * The command specification. + */ + private final ExternalCommandSpec spec; + + /** + * Tracks the worker status. + */ + private WorkerStatusTracker status; + + /** + * A future which should be completed when this worker is done. + */ + private KafkaFutureImpl<String> doneFuture; + + /** + * The executor service for this worker. + */ + private ExecutorService executor; + + public ExternalCommandWorker(String id, ExternalCommandSpec spec) { + this.id = id; + this.spec = spec; + } + + @Override + public void start(Platform platform, WorkerStatusTracker status, + KafkaFutureImpl<String> doneFuture) throws Exception { + if (!running.compareAndSet(false, true)) { + throw new IllegalStateException("ConsumeBenchWorker is already running."); + } + log.info("{}: Activating ExternalCommandWorker with {}", id, spec); + this.status = status; + this.doneFuture = doneFuture; + this.executor = Executors.newCachedThreadPool( + ThreadUtils.createThreadFactory("ExternalCommandWorkerThread%d", false)); + Process process = null; + try { + process = startProcess(); + } catch (Throwable t) { + log.error("{}: Unable to start process", id, t); + executor.shutdown(); + doneFuture.complete("Unable to start process: " + t.getMessage()); + return; + } + Future<?> stdoutFuture = executor.submit(new StdoutMonitor(process)); + Future<?> stderrFuture = executor.submit(new StderrMonitor(process)); + executor.submit(new StdinWriter(process)); + Future<?> terminatorFuture = executor.submit(new Terminator(process)); + executor.submit(new ExitMonitor(process, stdoutFuture, stderrFuture, terminatorFuture)); + ObjectNode startMessage = new ObjectNode(JsonNodeFactory.instance); + startMessage.set("id", new TextNode(id)); + startMessage.set("workload", spec.workload()); + stdinQueue.add(Optional.of(startMessage)); + } + + private Process startProcess() throws Exception { + if (spec.command().isEmpty()) { + throw new RuntimeException("No command specified"); + } + ProcessBuilder bld = new ProcessBuilder(spec.command()); + Process process = bld.start(); + return process; + } + + private static JsonNode readObject(String line) { + JsonNode resp; + try { + resp = JsonUtil.JSON_SERDE.readTree(line); + } catch (IOException e) { + return NullNode.instance; + } + return resp; + } + + class StdoutMonitor implements Runnable { + private final Process process; + + StdoutMonitor(Process process) { + this.process = process; + } + + @Override + public void run() { + log.trace("{}: starting stdout monitor.", id); + try (BufferedReader br = new BufferedReader( + new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while (true) { + try { + line = br.readLine(); + if (line == null) { + throw new IOException("EOF"); + } + } catch (IOException e) { + log.info("{}: can't read any more from stdout: {}", id, e.getMessage()); + return; + } + log.trace("{}: read line from stdin: {}", id, line); + JsonNode resp = readObject(line); + if (resp.has("status")) { + log.info("{}: New status: {}", id, resp.get("status").toString()); + status.update(resp.get("status")); + } + if (resp.has("log")) { + log.info("{}: (stdout): {}", id, resp.get("log").asText()); + } + if (resp.has("error")) { + String error = resp.get("error").asText(); + log.error("{}: error: {}", id, error); + doneFuture.complete(error); + } + } + } catch (Throwable e) { + log.info("{}: error reading from stdout.", id, e); + } + } + } + + class StderrMonitor implements Runnable { + private final Process process; + + StderrMonitor(Process process) { + this.process = process; + } + + @Override + public void run() { + log.trace("{}: starting stderr monitor.", id); + try (BufferedReader br = new BufferedReader( + new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8))) { + String line; + while (true) { + try { + line = br.readLine(); + if (line == null) { + throw new IOException("EOF"); + } + } catch (IOException e) { + log.info("{}: can't read any more from stderr: {}", id, e.getMessage()); + return; + } + log.error("{}: (stderr):{}", id, line); + } + } catch (Throwable e) { + log.info("{}: error reading from stderr.", id, e); + } + } + } + + class StdinWriter implements Runnable { + private final Process process; + + StdinWriter(Process process) { + this.process = process; + } + + @Override + public void run() { + OutputStreamWriter stdinWriter = new OutputStreamWriter( + process.getOutputStream(), StandardCharsets.UTF_8); + try { + while (true) { + log.info("{}: stdin writer ready.", id); + Optional<JsonNode> node = stdinQueue.take(); + if (!node.isPresent()) { + log.trace("{}: StdinWriter terminating.", id); + return; + } + String inputString = JsonUtil.toJsonString(node.get()); + log.info("{}: writing to stdin: {}", id, inputString); + stdinWriter.write(inputString + "\n"); + stdinWriter.flush(); + } + } catch (IOException e) { + log.info("{}: can't write any more to stdin: {}", id, e.getMessage()); + } catch (Throwable e) { + log.info("{}: error writing to stdin.", id, e); + } finally { + try { + stdinWriter.close(); + } catch (IOException e) { + log.debug("{}: error closing stdinWriter: {}", id, e.getMessage()); + } + } + } + } + + class ExitMonitor implements Runnable { + private final Process process; + private final Future<?> stdoutFuture; + private final Future<?> stderrFuture; + private final Future<?> terminatorFuture; + + ExitMonitor(Process process, Future<?> stdoutFuture, Future<?> stderrFuture, + Future<?> terminatorFuture) { + this.process = process; + this.stdoutFuture = stdoutFuture; + this.stderrFuture = stderrFuture; + this.terminatorFuture = terminatorFuture; + } + + @Override + public void run() { + try { + int exitStatus = process.waitFor(); + log.info("{}: process exited with return code {}", id, exitStatus); + // Wait for the stdout and stderr monitors to exit. It's particularly important + // to wait for the stdout monitor to exit since there may be an error or status + // there that we haven't seen yet. + stdoutFuture.get(); + stderrFuture.get(); + // Try to complete doneFuture with an error status based on the exit code. Note + // that if doneFuture was already completed previously, this will have no effect. + if (exitStatus == 0) { + doneFuture.complete(""); + } else { + doneFuture.complete("exited with return code " + exitStatus); + } + // Tell the StdinWriter thread to exit. + stdinQueue.add(Optional.empty()); + // Tell the shutdown manager thread to exit. + terminatorActionQueue.add(TerminatorAction.CLOSE); + terminatorFuture.get(); + executor.shutdown(); + } catch (Throwable e) { + log.error("{}: ExitMonitor error", id, e); + doneFuture.complete("ExitMonitor error: " + e.getMessage()); + } + } + } + + /** + * The thread which manages terminating the child process. + */ + class Terminator implements Runnable { + private final Process process; + + Terminator(Process process) { + this.process = process; + } + + @Override + public void run() { + try { + while (true) { + switch (terminatorActionQueue.take()) { + case DESTROY: + log.info("{}: destroying process", id); + process.getInputStream().close(); + process.getErrorStream().close(); + process.destroy(); + break; + case DESTROY_FORCIBLY: + log.info("{}: forcibly destroying process", id); + process.getInputStream().close(); + process.getErrorStream().close(); + process.destroyForcibly(); + break; + case CLOSE: + log.trace("{}: closing Terminator thread.", id); + return; + } + } + } catch (Throwable e) { + log.error("{}: Terminator error", id, e); + doneFuture.complete("Terminator error: " + e.getMessage()); + } + } + } + + @Override + public void stop(Platform platform) throws Exception { + if (!running.compareAndSet(true, false)) { + throw new IllegalStateException("ExternalCommandWorker is not running."); + } + log.info("{}: Deactivating ExternalCommandWorker.", id); + terminatorActionQueue.add(TerminatorAction.DESTROY); + int shutdownGracePeriodMs = spec.shutdownGracePeriodMs().isPresent() ? + spec.shutdownGracePeriodMs().get() : DEFAULT_SHUTDOWN_GRACE_PERIOD_MS; + if (!executor.awaitTermination(shutdownGracePeriodMs, TimeUnit.MILLISECONDS)) { + terminatorActionQueue.add(TerminatorAction.DESTROY_FORCIBLY); + executor.awaitTermination(1000, TimeUnit.DAYS); + } + this.status = null; + this.doneFuture = null; + this.executor = null; + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.java new file mode 100644 index 0000000..9800b13 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.OperatingSystem; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ExternalCommandWorkerTest { + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); + + static class ExternalCommandWorkerBuilder { + private final String id; + private int shutdownGracePeriodMs = 3000000; + private String[] command = new String[0]; + private ObjectNode workload; + + ExternalCommandWorkerBuilder(String id) { + this.id = id; + this.workload = new ObjectNode(JsonNodeFactory.instance); + this.workload.set("foo", new TextNode("value1")); + this.workload.set("bar", new IntNode(123)); + } + + ExternalCommandWorker build() { + ExternalCommandSpec spec = new ExternalCommandSpec(0, + 30000, + "node0", + Arrays.asList(command), + workload, + Optional.of(shutdownGracePeriodMs)); + return new ExternalCommandWorker(id, spec); + } + + ExternalCommandWorkerBuilder command(String... command) { + this.command = command; + return this; + } + + ExternalCommandWorkerBuilder shutdownGracePeriodMs(int shutdownGracePeriodMs) { + this.shutdownGracePeriodMs = shutdownGracePeriodMs; + return this; + } + } + + /** + * Test running a process which exits successfully-- in this case, /bin/true. + */ + @Test + public void testProcessWithNormalExit() throws Exception { + if (OperatingSystem.IS_WINDOWS) return; + ExternalCommandWorker worker = + new ExternalCommandWorkerBuilder("trueTask").command("true").build(); + KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>(); + worker.start(null, new AgentWorkerStatusTracker(), doneFuture); + assertEquals("", doneFuture.get()); + worker.stop(null); + } + + /** + * Test running a process which exits unsuccessfully-- in this case, /bin/false. + */ + @Test + public void testProcessWithFailedExit() throws Exception { + if (OperatingSystem.IS_WINDOWS) return; + ExternalCommandWorker worker = + new ExternalCommandWorkerBuilder("falseTask").command("false").build(); + KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>(); + worker.start(null, new AgentWorkerStatusTracker(), doneFuture); + assertEquals("exited with return code 1", doneFuture.get()); + worker.stop(null); + } + + /** + * Test attempting to run an exeutable which doesn't exist. + * We use a path which starts with /dev/null, since that should never be a + * directory in UNIX. + */ + @Test + public void testProcessNotFound() throws Exception { + ExternalCommandWorker worker = + new ExternalCommandWorkerBuilder("notFoundTask"). + command("/dev/null/non/existent/script/path").build(); + KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>(); + worker.start(null, new AgentWorkerStatusTracker(), doneFuture); + String errorString = doneFuture.get(); + assertTrue(errorString.startsWith("Unable to start process")); + worker.stop(null); + } + + /** + * Test running a process which times out. We will send it a SIGTERM. + */ + @Test + public void testProcessStop() throws Exception { + if (OperatingSystem.IS_WINDOWS) return; + ExternalCommandWorker worker = + new ExternalCommandWorkerBuilder("testStopTask"). + command("sleep", "3600000").build(); + KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>(); + worker.start(null, new AgentWorkerStatusTracker(), doneFuture); + worker.stop(null); + // We don't check the numeric return code, since that will vary based on + // platform. + assertTrue(doneFuture.get().startsWith("exited with return code ")); + } + + /** + * Test running a process which needs to be force-killed. + */ + @Test + public void testProcessForceKillTimeout() throws Exception { + if (OperatingSystem.IS_WINDOWS) return; + File tempFile = null; + try { + tempFile = TestUtils.tempFile(); + try (OutputStream stream = Files.newOutputStream(tempFile.toPath())) { + for (String line : new String[] { + "echo hello world\n", + "# Test that the initial message is sent correctly.\n", + "read -r line\n", + "[[ $line == '{\"id\":\"testForceKillTask\",\"workload\":{\"foo\":\"value1\",\"bar\":123}}' ]] || exit 0\n", + "\n", + "# Ignore SIGTERM signals. This ensures that we test SIGKILL delivery.\n", + "trap 'echo SIGTERM' SIGTERM\n", + "\n", + "# Update the process status. This will also unblock the junit test.\n", + "# It is important that we do this after we disabled SIGTERM, to ensure\n", + "# that we are testing SIGKILL.\n", + "echo '{\"status\": \"green\", \"log\": \"my log message.\"}'\n", + "\n", + "# Wait for the SIGKILL.\n", + "while true; do sleep 0.01; done\n"}) { + stream.write(line.getBytes(StandardCharsets.UTF_8)); + } + } + CompletableFuture<String> statusFuture = new CompletableFuture<>(); + final WorkerStatusTracker statusTracker = new WorkerStatusTracker() { + @Override + public void update(JsonNode status) { + statusFuture .complete(status.textValue().toString()); + } + }; + ExternalCommandWorker worker = new ExternalCommandWorkerBuilder("testForceKillTask"). + shutdownGracePeriodMs(1). + command("bash", tempFile.getAbsolutePath()). + build(); + KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>(); + worker.start(null, statusTracker, doneFuture); + assertEquals("green", statusFuture.get()); + worker.stop(null); + assertTrue(doneFuture.get().startsWith("exited with return code ")); + } finally { + if (tempFile != null) { + Files.delete(tempFile.toPath()); + } + } + } +}