This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4be68c5  KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219)
4be68c5 is described below

commit 4be68c58da6bbd16cb5682dbf1c9b5b1c93500a7
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Wed Feb 6 16:42:02 2019 -0800

    KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219)
    
    Allow the Trogdor agent to execute external commands. The agent 
communicates with the external commands via stdin, stdout, and stderr.
    
    Based on a patch by Xi Yang <x...@confluent.io>
    
    Reviewers: David Arthur <mum...@gmail.com>
---
 tests/bin/external_trogdor_command_example.py      |  38 ++
 tests/spec/external_command.json                   |  33 ++
 .../trogdor/workload/ExternalCommandSpec.java      | 115 ++++++
 .../trogdor/workload/ExternalCommandWorker.java    | 398 +++++++++++++++++++++
 .../workload/ExternalCommandWorkerTest.java        | 196 ++++++++++
 5 files changed, 780 insertions(+)

diff --git a/tests/bin/external_trogdor_command_example.py 
b/tests/bin/external_trogdor_command_example.py
new file mode 100755
index 0000000..0e53557
--- /dev/null
+++ b/tests/bin/external_trogdor_command_example.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import sys
+import time
+
+#
+# This is an example of an external script which can be run through Trogdor's
+# ExternalCommandWorker.
+#
+
+if __name__ == '__main__':
+    # Read the ExternalCommandWorker start message.
+    line = sys.stdin.readline()
+    start_message = json.loads(line)
+    workload = start_message["workload"]
+    print("Starting external_trogdor_command_example with task id %s, workload 
%s" \
+        % (start_message["id"], workload))
+    sys.stdout.flush()
+   `print(json.dumps({"status": "running"}))`
+    sys.stdout.flush()
+    time.sleep(0.001 * workload["delayMs"])
+   `print(json.dumps({"status": "exiting after %s delayMs" % 
workload["delayMs"]}))`
+    sys.stdout.flush()
diff --git a/tests/spec/external_command.json b/tests/spec/external_command.json
new file mode 100755
index 0000000..d432938
--- /dev/null
+++ b/tests/spec/external_command.json
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running an external command in Trogdor.
+// Note that this task spec uses a relative path, so make sure you launch
+// Trogdor from the project root directory when using it.
+// See TROGDOR.md for details.
+//
+
+{
+  "class": "org.apache.kafka.trogdor.workload.ExternalCommandSpec",
+  "command": ["./tests/bin/external_trogdor_command_example.py"],
+  "durationMs": 10000000,
+  "commandNode": "node0",
+  "workload":{
+    "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+    "message": "Hello, world",
+    "delayMs": 2000
+  }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java
new file mode 100644
index 0000000..4947aed
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * ExternalCommandSpec describes a task that executes Trogdor tasks with the 
command.
+ *
+ * An example uses the python runner to execute the ProduceBenchSpec task.
+ *
+ * #{@code
+ *   {
+ *      "class": "org.apache.kafka.trogdor.workload.ExternalCommandSpec",
+ *      "command": ["python", "/path/to/trogdor/python/runner"],
+ *      "durationMs": 10000000,
+ *      "producerNode": "node0",
+ *      "workload": {
+ *        "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+ *        "bootstrapServers": "localhost:9092",
+ *        "targetMessagesPerSec": 10,
+ *        "maxMessages": 100,
+ *        "activeTopics": {
+ *          "foo[1-3]": {
+ *            "numPartitions": 3,
+ *            "replicationFactor": 1
+ *          }
+ *        },
+ *        "inactiveTopics": {
+ *          "foo[4-5]": {
+ *            "numPartitions": 3,
+ *            "replicationFactor": 1
+ *          }
+ *        }
+ *     }
+ *   }
+ */
+public class ExternalCommandSpec extends TaskSpec {
+    private final String commandNode;
+    private final List<String> command;
+    private final JsonNode workload;
+    private final Optional<Integer> shutdownGracePeriodMs;
+
+    @JsonCreator
+    public ExternalCommandSpec(
+            @JsonProperty("startMs") long startMs,
+            @JsonProperty("durationMs") long durationMs,
+            @JsonProperty("commandNode") String commandNode,
+            @JsonProperty("command") List<String> command,
+            @JsonProperty("workload") JsonNode workload,
+            @JsonProperty("shutdownGracePeriodMs") Optional<Integer> 
shutdownGracePeriodMs) {
+        super(startMs, durationMs);
+        this.commandNode = (commandNode == null) ? "" : commandNode;
+        this.command = (command == null) ? Collections.unmodifiableList(new 
ArrayList<String>()) : command;
+        this.workload = (workload == null) ? NullNode.instance : workload;
+        this.shutdownGracePeriodMs = shutdownGracePeriodMs;
+    }
+
+    @JsonProperty
+    public String commandNode() {
+        return commandNode;
+    }
+
+    @JsonProperty
+    public List<String> command() {
+        return command;
+    }
+
+    @JsonProperty
+    public JsonNode workload() {
+        return workload;
+    }
+
+    @JsonProperty
+    public Optional<Integer> shutdownGracePeriodMs() {
+        return shutdownGracePeriodMs;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return topology -> Collections.singleton(commandNode);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ExternalCommandWorker(id, this);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java
new file mode 100644
index 0000000..6f5799f
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandWorker.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Optional;
+
+/**
+ * ExternalCommandWorker starts an external process to run a Trogdor command.
+ *
+ * The worker communicates with the external process over the standard input 
and output streams.
+ *
+ * When the process is first launched, ExternalCommandWorker will send a 
message on standard
+ * input describing the task ID and the workload.  This message will not 
contain line breaks.
+ * It will have this JSON format:
+ * {"id":<task ID string>, "workload":<configured workload JSON object>}
+ *
+ * ExternalCommandWorker will log anything that the process writes to stderr, 
but will take
+ * no other action with it.
+ *
+ * If the process sends a single-line JSON object to stdout, 
ExternalCommandWorker will parse it.
+ * The JSON object can contain the following fields:
+ * - status: If the object contains this field, the status will be set to the 
given value.
+ * - error: If the object contains this field, the error will be set to the 
given value.
+ *   Once an error occurs, we will try to terminate the process.
+ * - log: If the object contains this field, a log message will be issued with 
this text.
+ *
+ * Note that standard output is buffered by default.  The subprocess may wish
+ * to flush it after writing its status JSON.  This will ensure that the status
+ * is seen in a timely fashion.
+ *
+ * If the process sends a non-JSON line to stdout, the worker will log it.
+ *
+ * If the process exits, ExternalCommandWorker will finish.  If the process 
exits unsuccessfully,
+ * this is considered an error.  If the worker needs to stop the process, it 
will start by sending
+ * a SIGTERM.  If this does not have the required effect, it will send a 
SIGKILL, once the shutdown
+ * grace period has elapsed.
+ */
+public class ExternalCommandWorker implements TaskWorker {
+    private static final Logger log = 
LoggerFactory.getLogger(ExternalCommandWorker.class);
+
+    private static final int DEFAULT_SHUTDOWN_GRACE_PERIOD_MS = 5000;
+
+    /**
+     * True only if the worker is running.
+     */
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    enum TerminatorAction {
+        DESTROY,
+        DESTROY_FORCIBLY,
+        CLOSE
+    }
+
+    /**
+     * A queue used to communicate with the signal sender thread.
+     */
+    private final LinkedBlockingQueue<TerminatorAction> terminatorActionQueue 
= new LinkedBlockingQueue<>();
+
+    /**
+     * The queue of objects to write to the process stdin.
+     */
+    private final LinkedBlockingQueue<Optional<JsonNode>> stdinQueue = new 
LinkedBlockingQueue<>();
+
+    /**
+     * The task ID.
+     */
+    private final String id;
+
+    /**
+     * The command specification.
+     */
+    private final ExternalCommandSpec spec;
+
+    /**
+     * Tracks the worker status.
+     */
+    private WorkerStatusTracker status;
+
+    /**
+     * A future which should be completed when this worker is done.
+     */
+    private KafkaFutureImpl<String> doneFuture;
+
+    /**
+     * The executor service for this worker.
+     */
+    private ExecutorService executor;
+
+    public ExternalCommandWorker(String id, ExternalCommandSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, WorkerStatusTracker status,
+                      KafkaFutureImpl<String> doneFuture) throws Exception {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("ConsumeBenchWorker is already 
running.");
+        }
+        log.info("{}: Activating ExternalCommandWorker with {}", id, spec);
+        this.status = status;
+        this.doneFuture = doneFuture;
+        this.executor = Executors.newCachedThreadPool(
+            ThreadUtils.createThreadFactory("ExternalCommandWorkerThread%d", 
false));
+        Process process = null;
+        try {
+            process = startProcess();
+        } catch (Throwable t) {
+            log.error("{}: Unable to start process", id, t);
+            executor.shutdown();
+            doneFuture.complete("Unable to start process: " + t.getMessage());
+            return;
+        }
+        Future<?> stdoutFuture = executor.submit(new StdoutMonitor(process));
+        Future<?> stderrFuture = executor.submit(new StderrMonitor(process));
+        executor.submit(new StdinWriter(process));
+        Future<?> terminatorFuture = executor.submit(new Terminator(process));
+        executor.submit(new ExitMonitor(process, stdoutFuture, stderrFuture, 
terminatorFuture));
+        ObjectNode startMessage = new ObjectNode(JsonNodeFactory.instance);
+        startMessage.set("id", new TextNode(id));
+        startMessage.set("workload", spec.workload());
+        stdinQueue.add(Optional.of(startMessage));
+    }
+
+    private Process startProcess() throws Exception {
+        if (spec.command().isEmpty()) {
+            throw new RuntimeException("No command specified");
+        }
+        ProcessBuilder bld = new ProcessBuilder(spec.command());
+        Process process = bld.start();
+        return process;
+    }
+
+    private static JsonNode readObject(String line) {
+        JsonNode resp;
+        try {
+            resp = JsonUtil.JSON_SERDE.readTree(line);
+        } catch (IOException e) {
+            return NullNode.instance;
+        }
+        return resp;
+    }
+
+    class StdoutMonitor implements Runnable {
+        private final Process process;
+
+        StdoutMonitor(Process process) {
+            this.process = process;
+        }
+
+        @Override
+        public void run() {
+            log.trace("{}: starting stdout monitor.", id);
+            try (BufferedReader br = new BufferedReader(
+                    new InputStreamReader(process.getInputStream(), 
StandardCharsets.UTF_8))) {
+                String line;
+                while (true) {
+                    try {
+                        line = br.readLine();
+                        if (line == null) {
+                            throw new IOException("EOF");
+                        }
+                    } catch (IOException e) {
+                        log.info("{}: can't read any more from stdout: {}", 
id, e.getMessage());
+                        return;
+                    }
+                    log.trace("{}: read line from stdin: {}", id, line);
+                    JsonNode resp = readObject(line);
+                    if (resp.has("status")) {
+                        log.info("{}: New status: {}", id, 
resp.get("status").toString());
+                        status.update(resp.get("status"));
+                    }
+                    if (resp.has("log")) {
+                        log.info("{}: (stdout): {}", id, 
resp.get("log").asText());
+                    }
+                    if (resp.has("error")) {
+                        String error = resp.get("error").asText();
+                        log.error("{}: error: {}", id, error);
+                        doneFuture.complete(error);
+                    }
+                }
+            } catch (Throwable e) {
+                log.info("{}: error reading from stdout.", id, e);
+            }
+        }
+    }
+
+    class StderrMonitor implements Runnable {
+        private final Process process;
+
+        StderrMonitor(Process process) {
+            this.process = process;
+        }
+
+        @Override
+        public void run() {
+            log.trace("{}: starting stderr monitor.", id);
+            try (BufferedReader br = new BufferedReader(
+                new InputStreamReader(process.getErrorStream(), 
StandardCharsets.UTF_8))) {
+                String line;
+                while (true) {
+                    try {
+                        line = br.readLine();
+                        if (line == null) {
+                            throw new IOException("EOF");
+                        }
+                    } catch (IOException e) {
+                        log.info("{}: can't read any more from stderr: {}", 
id, e.getMessage());
+                        return;
+                    }
+                    log.error("{}: (stderr):{}", id, line);
+                }
+            } catch (Throwable e) {
+                log.info("{}: error reading from stderr.", id, e);
+            }
+        }
+    }
+
+    class StdinWriter implements Runnable {
+        private final Process process;
+
+        StdinWriter(Process process) {
+            this.process = process;
+        }
+
+        @Override
+        public void run() {
+            OutputStreamWriter stdinWriter = new OutputStreamWriter(
+                process.getOutputStream(), StandardCharsets.UTF_8);
+            try {
+                while (true) {
+                    log.info("{}: stdin writer ready.", id);
+                    Optional<JsonNode> node = stdinQueue.take();
+                    if (!node.isPresent()) {
+                        log.trace("{}: StdinWriter terminating.", id);
+                        return;
+                    }
+                    String inputString = JsonUtil.toJsonString(node.get());
+                    log.info("{}: writing to stdin: {}", id, inputString);
+                    stdinWriter.write(inputString + "\n");
+                    stdinWriter.flush();
+                }
+            } catch (IOException e) {
+                log.info("{}: can't write any more to stdin: {}", id, 
e.getMessage());
+            } catch (Throwable e) {
+                log.info("{}: error writing to stdin.", id, e);
+            } finally {
+                try {
+                    stdinWriter.close();
+                } catch (IOException e) {
+                    log.debug("{}: error closing stdinWriter: {}", id, 
e.getMessage());
+                }
+            }
+        }
+    }
+
+    class ExitMonitor implements Runnable {
+        private final Process process;
+        private final Future<?> stdoutFuture;
+        private final Future<?> stderrFuture;
+        private final Future<?> terminatorFuture;
+
+        ExitMonitor(Process process, Future<?> stdoutFuture, Future<?> 
stderrFuture,
+                    Future<?> terminatorFuture) {
+            this.process = process;
+            this.stdoutFuture = stdoutFuture;
+            this.stderrFuture = stderrFuture;
+            this.terminatorFuture = terminatorFuture;
+        }
+
+        @Override
+        public void run() {
+            try {
+                int exitStatus = process.waitFor();
+                log.info("{}: process exited with return code {}", id, 
exitStatus);
+                // Wait for the stdout and stderr monitors to exit.  It's 
particularly important
+                // to wait for the stdout monitor to exit since there may be 
an error or status
+                // there that we haven't seen yet.
+                stdoutFuture.get();
+                stderrFuture.get();
+                // Try to complete doneFuture with an error status based on 
the exit code.  Note
+                // that if doneFuture was already completed previously, this 
will have no effect.
+                if (exitStatus == 0) {
+                    doneFuture.complete("");
+                } else {
+                    doneFuture.complete("exited with return code " + 
exitStatus);
+                }
+                // Tell the StdinWriter thread to exit.
+                stdinQueue.add(Optional.empty());
+                // Tell the shutdown manager thread to exit.
+                terminatorActionQueue.add(TerminatorAction.CLOSE);
+                terminatorFuture.get();
+                executor.shutdown();
+            } catch (Throwable e) {
+                log.error("{}: ExitMonitor error", id, e);
+                doneFuture.complete("ExitMonitor error: " + e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * The thread which manages terminating the child process.
+     */
+    class Terminator implements Runnable {
+        private final Process process;
+
+        Terminator(Process process) {
+            this.process = process;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    switch (terminatorActionQueue.take()) {
+                        case DESTROY:
+                            log.info("{}: destroying process", id);
+                            process.getInputStream().close();
+                            process.getErrorStream().close();
+                            process.destroy();
+                            break;
+                        case DESTROY_FORCIBLY:
+                            log.info("{}: forcibly destroying process", id);
+                            process.getInputStream().close();
+                            process.getErrorStream().close();
+                            process.destroyForcibly();
+                            break;
+                        case CLOSE:
+                            log.trace("{}: closing Terminator thread.", id);
+                            return;
+                    }
+                }
+            } catch (Throwable e) {
+                log.error("{}: Terminator error", id, e);
+                doneFuture.complete("Terminator error: " + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ExternalCommandWorker is not 
running.");
+        }
+        log.info("{}: Deactivating ExternalCommandWorker.", id);
+        terminatorActionQueue.add(TerminatorAction.DESTROY);
+        int shutdownGracePeriodMs = spec.shutdownGracePeriodMs().isPresent() ?
+            spec.shutdownGracePeriodMs().get() : 
DEFAULT_SHUTDOWN_GRACE_PERIOD_MS;
+        if (!executor.awaitTermination(shutdownGracePeriodMs, 
TimeUnit.MILLISECONDS)) {
+            terminatorActionQueue.add(TerminatorAction.DESTROY_FORCIBLY);
+            executor.awaitTermination(1000, TimeUnit.DAYS);
+        }
+        this.status = null;
+        this.doneFuture = null;
+        this.executor = null;
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.java
new file mode 100644
index 0000000..9800b13
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.OperatingSystem;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalCommandWorkerTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    static class ExternalCommandWorkerBuilder {
+        private final String id;
+        private int shutdownGracePeriodMs = 3000000;
+        private String[] command = new String[0];
+        private ObjectNode workload;
+
+        ExternalCommandWorkerBuilder(String id) {
+            this.id = id;
+            this.workload = new ObjectNode(JsonNodeFactory.instance);
+            this.workload.set("foo", new TextNode("value1"));
+            this.workload.set("bar", new IntNode(123));
+        }
+
+        ExternalCommandWorker build() {
+            ExternalCommandSpec spec = new ExternalCommandSpec(0,
+                30000,
+                "node0",
+                Arrays.asList(command),
+                workload,
+                Optional.of(shutdownGracePeriodMs));
+            return new ExternalCommandWorker(id, spec);
+        }
+
+        ExternalCommandWorkerBuilder command(String... command) {
+            this.command = command;
+            return this;
+        }
+
+        ExternalCommandWorkerBuilder shutdownGracePeriodMs(int 
shutdownGracePeriodMs) {
+            this.shutdownGracePeriodMs = shutdownGracePeriodMs;
+            return this;
+        }
+    }
+
+    /**
+     * Test running a process which exits successfully-- in this case, 
/bin/true.
+     */
+    @Test
+    public void testProcessWithNormalExit() throws Exception {
+        if (OperatingSystem.IS_WINDOWS) return;
+        ExternalCommandWorker worker =
+            new 
ExternalCommandWorkerBuilder("trueTask").command("true").build();
+        KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>();
+        worker.start(null, new AgentWorkerStatusTracker(), doneFuture);
+        assertEquals("", doneFuture.get());
+        worker.stop(null);
+    }
+
+    /**
+     * Test running a process which exits unsuccessfully-- in this case, 
/bin/false.
+     */
+    @Test
+    public void testProcessWithFailedExit() throws Exception {
+        if (OperatingSystem.IS_WINDOWS) return;
+        ExternalCommandWorker worker =
+            new 
ExternalCommandWorkerBuilder("falseTask").command("false").build();
+        KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>();
+        worker.start(null, new AgentWorkerStatusTracker(), doneFuture);
+        assertEquals("exited with return code 1", doneFuture.get());
+        worker.stop(null);
+    }
+
+    /**
+     * Test attempting to run an exeutable which doesn't exist.
+     * We use a path which starts with /dev/null, since that should never be a
+     * directory in UNIX.
+     */
+    @Test
+    public void testProcessNotFound() throws Exception {
+        ExternalCommandWorker worker =
+            new ExternalCommandWorkerBuilder("notFoundTask").
+                command("/dev/null/non/existent/script/path").build();
+        KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>();
+        worker.start(null, new AgentWorkerStatusTracker(), doneFuture);
+        String errorString = doneFuture.get();
+        assertTrue(errorString.startsWith("Unable to start process"));
+        worker.stop(null);
+    }
+
+    /**
+     * Test running a process which times out.  We will send it a SIGTERM.
+     */
+    @Test
+    public void testProcessStop() throws Exception {
+        if (OperatingSystem.IS_WINDOWS) return;
+        ExternalCommandWorker worker =
+            new ExternalCommandWorkerBuilder("testStopTask").
+                command("sleep", "3600000").build();
+        KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>();
+        worker.start(null, new AgentWorkerStatusTracker(), doneFuture);
+        worker.stop(null);
+        // We don't check the numeric return code, since that will vary based 
on
+        // platform.
+        assertTrue(doneFuture.get().startsWith("exited with return code "));
+    }
+
+    /**
+     * Test running a process which needs to be force-killed.
+     */
+    @Test
+    public void testProcessForceKillTimeout() throws Exception {
+        if (OperatingSystem.IS_WINDOWS) return;
+        File tempFile = null;
+        try {
+            tempFile = TestUtils.tempFile();
+            try (OutputStream stream = 
Files.newOutputStream(tempFile.toPath())) {
+                for (String line : new String[] {
+                    "echo hello world\n",
+                    "# Test that the initial message is sent correctly.\n",
+                    "read -r line\n",
+                    "[[ $line == 
'{\"id\":\"testForceKillTask\",\"workload\":{\"foo\":\"value1\",\"bar\":123}}' 
]] || exit 0\n",
+                    "\n",
+                    "# Ignore SIGTERM signals.  This ensures that we test 
SIGKILL delivery.\n",
+                    "trap 'echo SIGTERM' SIGTERM\n",
+                    "\n",
+                    "# Update the process status.  This will also unblock the 
junit test.\n",
+                    "# It is important that we do this after we disabled 
SIGTERM, to ensure\n",
+                    "# that we are testing SIGKILL.\n",
+                    "echo '{\"status\": \"green\", \"log\": \"my log 
message.\"}'\n",
+                    "\n",
+                    "# Wait for the SIGKILL.\n",
+                    "while true; do sleep 0.01; done\n"}) {
+                    stream.write(line.getBytes(StandardCharsets.UTF_8));
+                }
+            }
+            CompletableFuture<String> statusFuture = new CompletableFuture<>();
+            final WorkerStatusTracker statusTracker = new 
WorkerStatusTracker() {
+                @Override
+                public void update(JsonNode status) {
+                    statusFuture .complete(status.textValue().toString());
+                }
+            };
+            ExternalCommandWorker worker = new 
ExternalCommandWorkerBuilder("testForceKillTask").
+                shutdownGracePeriodMs(1).
+                command("bash", tempFile.getAbsolutePath()).
+                build();
+            KafkaFutureImpl<String> doneFuture = new KafkaFutureImpl<>();
+            worker.start(null, statusTracker, doneFuture);
+            assertEquals("green", statusFuture.get());
+            worker.stop(null);
+            assertTrue(doneFuture.get().startsWith("exited with return code 
"));
+        } finally {
+            if (tempFile != null) {
+                Files.delete(tempFile.toPath());
+            }
+        }
+    }
+}

Reply via email to