Repository: kafka Updated Branches: refs/heads/trunk 53c5ccb33 -> d9cbc6b1a
KAFKA-5811; Add Kibosh integration for Trogdor and Ducktape For ducktape: add Kibosh to the testing Dockerfile. Create files_unreadable_fault_spec.py. For trogdor: create FilesUnreadableFaultSpec.java. Add a unit test of using the Kibosh service. Author: Colin P. Mccabe <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #4195 from cmccabe/KAFKA-5811 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9cbc6b1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9cbc6b1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9cbc6b1 Branch: refs/heads/trunk Commit: d9cbc6b1a28e2fb3f88137429e08c12201998bc7 Parents: 53c5ccb Author: Colin P. Mccabe <[email protected]> Authored: Thu Nov 16 17:59:24 2017 +0000 Committer: Rajini Sivaram <[email protected]> Committed: Thu Nov 16 17:59:24 2017 +0000 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 +- tests/docker/Dockerfile | 4 + .../trogdor/files_unreadable_fault_spec.py | 58 ++++++ tests/kafkatest/services/trogdor/kibosh.py | 152 ++++++++++++++ tests/kafkatest/tests/tools/kibosh_test.py | 81 ++++++++ .../trogdor/fault/FilesUnreadableFaultSpec.java | 82 ++++++++ .../org/apache/kafka/trogdor/fault/Kibosh.java | 199 +++++++++++++++++++ .../trogdor/fault/KiboshFaultController.java | 36 ++++ .../kafka/trogdor/fault/KiboshFaultWorker.java | 56 ++++++ .../apache/kafka/trogdor/agent/AgentTest.java | 78 ++++++++ 10 files changed, 747 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7c32eb0..6218794 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -45,7 +45,7 @@ <suppress checks="ClassDataAbstractionCoupling" files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/> <suppress checks="ClassDataAbstractionCoupling" - files="(Errors|SaslAuthenticatorTest).java"/> + files="(Errors|SaslAuthenticatorTest|AgentTest).java"/> <suppress checks="BooleanExpressionComplexity" files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 3ca9993..d855fe4 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -48,6 +48,10 @@ RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.1 RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1" RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0" +# Install Kibosh +RUN apt-get install fuse +RUN cd /opt && git clone -q https://github.com/confluentinc/kibosh.git && cd "/opt/kibosh" && git reset --hard 399de967c5520e8fe6e32d4e5c1c55d71cd7f46c && mkdir "/opt/kibosh/build" && cd "/opt/kibosh/build" && ../configure && make -j 2 + # Set up the ducker user. RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers USER ducker http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py new file mode 100644 index 0000000..4f0540a --- /dev/null +++ b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.trogdor.task_spec import TaskSpec + + +class FilesUnreadableFaultSpec(TaskSpec): + """ + The specification for a fault which makes files unreadable. + """ + + def __init__(self, start_ms, duration_ms, node_names, mount_path, + prefix, error_code): + """ + Create a new FilesUnreadableFaultSpec. + + :param start_ms: The start time, as described in task_spec.py + :param duration_ms: The duration in milliseconds. + :param node_names: The names of the node(s) to create the fault on. + :param mount_path: The mount path. + :param prefix: The prefix within the mount point to make unreadable. + :param error_code: The error code to use. + """ + super(FilesUnreadableFaultSpec, self).__init__(start_ms, duration_ms) + self.node_names = node_names + self.mount_path = mount_path + self.prefix = prefix + self.error_code = error_code + + def message(self): + return { + "class": "org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec", + "startMs": self.start_ms, + "durationMs": self.duration_ms, + "nodeNames": self.node_names, + "mountPath": self.mount_path, + "prefix": self.prefix, + "errorCode": self.error_code, + } + + def kibosh_message(self): + return { + "type": "unreadable", + "prefix": self.prefix, + "code": self.error_code, + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/services/trogdor/kibosh.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/trogdor/kibosh.py b/tests/kafkatest/services/trogdor/kibosh.py new file mode 100644 index 0000000..1bd4224 --- /dev/null +++ b/tests/kafkatest/services/trogdor/kibosh.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os.path + +from ducktape.services.service import Service +from ducktape.utils import util + + +class KiboshService(Service): + """ + Kibosh is a fault-injecting FUSE filesystem. + + Attributes: + INSTALL_ROOT The path of where Kibosh is installed. + BINARY_NAME The Kibosh binary name. + BINARY_PATH The path to the kibosh binary. + """ + INSTALL_ROOT = "/opt/kibosh/build" + BINARY_NAME = "kibosh" + BINARY_PATH = os.path.join(INSTALL_ROOT, BINARY_NAME) + + def __init__(self, context, nodes, target, mirror, persist="/mnt/kibosh"): + """ + Create a Kibosh service. + + :param context: The TestContext object. + :param nodes: The nodes to put the Kibosh FS on. Kibosh allocates no + nodes of its own. + :param target: The target directory, which Kibosh exports a view of. + :param mirror: The mirror directory, where Kibosh injects faults. + :param persist: Where the log files and pid files will be created. + """ + Service.__init__(self, context, num_nodes=0) + if (len(nodes) == 0): + raise RuntimeError("You must supply at least one node to run the service on.") + for node in nodes: + self.nodes.append(node) + + self.target = target + self.mirror = mirror + self.persist = persist + + self.control_path = os.path.join(self.mirror, "kibosh_control") + self.pidfile_path = os.path.join(self.persist, "pidfile") + self.stdout_stderr_path = os.path.join(self.persist, "kibosh-stdout-stderr.log") + self.log_path = os.path.join(self.persist, "kibosh.log") + self.logs = { + "kibosh-stdout-stderr.log": { + "path": self.stdout_stderr_path, + "collect_default": True}, + "kibosh.log": { + "path": self.log_path, + "collect_default": True} + } + + def free(self): + """Clear the nodes list.""" + # Because the filesystem runs on nodes which have been allocated by other services, those nodes + # are not deallocated here. + self.nodes = [] + Service.free(self) + + def kibosh_running(self, node): + return 0 == node.account.ssh("test -e '%s'" % self.control_path, allow_fail=True) + + def start_node(self, node): + node.account.mkdirs(self.persist) + cmd = "sudo -E " + cmd += " %s" % KiboshService.BINARY_PATH + cmd += " --target %s" % self.target + cmd += " --pidfile %s" % self.pidfile_path + cmd += " --log %s" % self.log_path + cmd += " --verbose" + cmd += " %s" % self.mirror + cmd += " &> %s" % self.stdout_stderr_path + node.account.ssh(cmd) + util.wait_until(lambda: self.kibosh_running(node), 20, backoff_sec=.1, + err_msg="Timed out waiting for kibosh to start on %s" % node.account.hostname) + + def pids(self, node): + return [pid for pid in node.account.ssh_capture("test -e '%s' && test -e /proc/$(cat '%s')" % + (self.pidfile_path, self.pidfile_path), allow_fail=True)] + + def wait_node(self, node, timeout_sec=None): + return len(self.pids(node)) == 0 + + def kibosh_process_running(self, node): + pids = self.pids(node) + if len(pids) == 0: + return True + return False + + def stop_node(self, node): + """Halt kibosh process(es) on this node.""" + node.account.logger.debug("stop_node(%s): unmounting %s" % (node.name, self.mirror)) + node.account.ssh("sudo fusermount -u %s" % self.mirror, allow_fail=True) + # Wait for the kibosh process to terminate. + try: + util.wait_until(lambda: self.kibosh_process_running(node), 20, backoff_sec=.1, + err_msg="Timed out waiting for kibosh to stop on %s" % node.account.hostname) + except TimeoutError: + # If the process won't terminate, use kill -9 to shut it down. + node.account.logger.debug("stop_node(%s): killing the kibosh process managing %s" % (node.name, self.mirror)) + node.account.ssh("sudo kill -9 %s" % (" ".join(self.pids(node))), allow_fail=True) + node.account.ssh("sudo fusermount -u %s" % self.mirror) + util.wait_until(lambda: self.kibosh_process_running(node), 20, backoff_sec=.1, + err_msg="Timed out waiting for kibosh to stop on %s" % node.account.hostname) + + def clean_node(self, node): + """Clean up persistent state on this node - e.g. service logs, configuration files etc.""" + self.stop_node(node) + node.account.ssh("rm -rf -- %s" % self.persist) + + def set_faults(self, node, specs): + """ + Set the currently active faults. + + :param node: The node. + :param spec: An array of FaultSpec objects describing the faults. + """ + fault_array = [spec.kibosh_message() for spec in specs] + obj = { 'faults': fault_array } + obj_json = json.dumps(obj) + node.account.create_file(self.control_path, obj_json) + + def get_fault_json(self, node): + """ + Return a JSON string which contains the currently active faults. + + :param node: The node. + + :returns: The fault JSON describing the faults. + """ + iter = node.account.ssh_capture("cat '%s'" % self.control_path) + text = "" + for line in iter: + text = "%s%s" % (text, line.rstrip("\r\n")) + return text http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tests/kafkatest/tests/tools/kibosh_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/tools/kibosh_test.py b/tests/kafkatest/tests/tools/kibosh_test.py new file mode 100644 index 0000000..5844c27 --- /dev/null +++ b/tests/kafkatest/tests/tools/kibosh_test.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.cluster.cluster_spec import ClusterSpec +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.trogdor.task_spec import TaskSpec +from kafkatest.services.trogdor.files_unreadable_fault_spec import FilesUnreadableFaultSpec +from kafkatest.services.trogdor.kibosh import KiboshService +import json + + +class KiboshTest(Test): + TARGET = "/mnt/kibosh-target" + MIRROR = "/mnt/kibosh-mirror" + + """ + Tests the Kibosh fault injection filesystem in isolation. + """ + + def __init__(self, test_context): + super(KiboshTest, self).__init__(test_context) + + def set_up_kibosh(self, num_nodes): + self.nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_nodes)) + for node in self.nodes: + node.account.logger = self.logger + node.account.ssh("mkdir -p -- %s %s" % (KiboshTest.TARGET, KiboshTest.MIRROR)) + self.kibosh = KiboshService(self.test_context, self.nodes, + KiboshTest.TARGET, KiboshTest.MIRROR) + for node in self.nodes: + node.account.logger = self.kibosh.logger + self.kibosh.start() + + def setUp(self): + self.kibosh = None + self.nodes = None + + def tearDown(self): + if self.kibosh is not None: + self.kibosh.stop() + self.kibosh = None + for node in self.nodes: + node.account.ssh("rm -rf -- %s %s" % (KiboshTest.TARGET, KiboshTest.MIRROR)) + if self.nodes is not None: + self.test_context.cluster.free(self.nodes) + self.nodes = None + + @cluster(num_nodes=4) + def test_kibosh_service(self): + pass + """ + Test that we can bring up kibosh and create a fault. + """ + self.set_up_kibosh(3) + spec = FilesUnreadableFaultSpec(0, TaskSpec.MAX_DURATION_MS, + [self.nodes[0].name], KiboshTest.TARGET, "/foo", 12) + node = self.nodes[0] + + def check(self, node): + fault_json = self.kibosh.get_fault_json(node) + expected_json = json.dumps({"faults": [spec.kibosh_message()]}) + self.logger.info("Read back: [%s]. Expected: [%s]." % (fault_json, expected_json)) + return fault_json == expected_json + + self.kibosh.set_faults(node, [spec]) + wait_until(lambda: check(self, node), + timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back fault array.") http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java new file mode 100644 index 0000000..1fbf9d0 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec; +import org.apache.kafka.trogdor.task.TaskController; +import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.Set; + +/** + * The specification for a fault that makes files unreadable. + */ +public class FilesUnreadableFaultSpec extends TaskSpec { + private final Set<String> nodeNames; + private final String mountPath; + private final String prefix; + private final int errorCode; + + @JsonCreator + public FilesUnreadableFaultSpec(@JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs, + @JsonProperty("nodeNames") Set<String> nodeNames, + @JsonProperty("mountPath") String mountPath, + @JsonProperty("prefix") String prefix, + @JsonProperty("errorCode") int errorCode) { + super(startMs, durationMs); + this.nodeNames = nodeNames; + this.mountPath = mountPath; + this.prefix = prefix; + this.errorCode = errorCode; + } + + @JsonProperty + public Set<String> nodeNames() { + return nodeNames; + } + + @JsonProperty + public String mountPath() { + return mountPath; + } + + @JsonProperty + public String prefix() { + return prefix; + } + + @JsonProperty + public int errorCode() { + return errorCode; + } + + @Override + public TaskController newController(String id) { + return new KiboshFaultController(nodeNames); + } + + @Override + public TaskWorker newTaskWorker(String id) { + return new KiboshFaultWorker(id, + new KiboshFilesUnreadableFaultSpec(prefix, errorCode), mountPath); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java new file mode 100644 index 0000000..6fa1a4b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.TreeMap; + +public final class Kibosh { + public static final Kibosh INSTANCE = new Kibosh(); + + public final static String KIBOSH_CONTROL = "kibosh_control"; + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type") + @JsonSubTypes({ + @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"), + }) + public static abstract class KiboshFaultSpec { + @Override + public final boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return Objects.equals(toString(), o.toString()); + } + + @Override + public final int hashCode() { + return toString().hashCode(); + } + + @Override + public final String toString() { + return JsonUtil.toJsonString(this); + } + } + + public static class KiboshFilesUnreadableFaultSpec extends KiboshFaultSpec { + private final String prefix; + private final int errorCode; + + @JsonCreator + public KiboshFilesUnreadableFaultSpec(@JsonProperty("prefix") String prefix, + @JsonProperty("errorCode") int errorCode) { + this.prefix = prefix; + this.errorCode = errorCode; + } + + @JsonProperty + public String prefix() { + return prefix; + } + + @JsonProperty + public int errorCode() { + return errorCode; + } + } + + private static class KiboshProcess { + private final Path controlPath; + + KiboshProcess(String mountPath) { + this.controlPath = Paths.get(mountPath, KIBOSH_CONTROL); + if (!Files.exists(controlPath)) { + throw new RuntimeException("Can't find file " + controlPath); + } + } + + synchronized void addFault(KiboshFaultSpec toAdd) throws IOException { + KiboshControlFile file = KiboshControlFile.read(controlPath); + List<KiboshFaultSpec> faults = new ArrayList<>(file.faults()); + faults.add(toAdd); + new KiboshControlFile(faults).write(controlPath); + } + + synchronized void removeFault(KiboshFaultSpec toRemove) throws IOException { + KiboshControlFile file = KiboshControlFile.read(controlPath); + List<KiboshFaultSpec> faults = new ArrayList<>(); + boolean foundToRemove = false; + for (KiboshFaultSpec fault : file.faults()) { + if (fault.equals(toRemove)) { + foundToRemove = true; + } else { + faults.add(fault); + } + } + if (!foundToRemove) { + throw new RuntimeException("Failed to find fault " + toRemove + ". "); + } + new KiboshControlFile(faults).write(controlPath); + } + } + + public static class KiboshControlFile { + private final List<KiboshFaultSpec> faults; + + public final static KiboshControlFile EMPTY = + new KiboshControlFile(Collections.<KiboshFaultSpec>emptyList()); + + public static KiboshControlFile read(Path controlPath) throws IOException { + byte[] controlFileBytes = Files.readAllBytes(controlPath); + return JsonUtil.JSON_SERDE.readValue(controlFileBytes, KiboshControlFile.class); + } + + @JsonCreator + public KiboshControlFile(@JsonProperty("faults") List<KiboshFaultSpec> faults) { + this.faults = faults; + } + + @JsonProperty + public List<KiboshFaultSpec> faults() { + return faults; + } + + public void write(Path controlPath) throws IOException { + Files.write(controlPath, JsonUtil.JSON_SERDE.writeValueAsBytes(this)); + } + + @Override + public final boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return Objects.equals(toString(), o.toString()); + } + + @Override + public final int hashCode() { + return toString().hashCode(); + } + + @Override + public final String toString() { + return JsonUtil.toJsonString(this); + } + } + + private final TreeMap<String, KiboshProcess> processes = new TreeMap<>(); + + private Kibosh() { + } + + /** + * Get or create a KiboshProcess object to manage the Kibosh process at a given path. + */ + private synchronized KiboshProcess findProcessObject(String mountPath) { + String path = Paths.get(mountPath).normalize().toString(); + KiboshProcess process = processes.get(path); + if (process == null) { + process = new KiboshProcess(mountPath); + processes.put(path, process); + } + return process; + } + + /** + * Add a new Kibosh fault. + */ + void addFault(String mountPath, KiboshFaultSpec spec) throws IOException { + KiboshProcess process = findProcessObject(mountPath); + process.addFault(spec); + } + + /** + * Remove a Kibosh fault. + */ + void removeFault(String mountPath, KiboshFaultSpec spec) throws IOException { + KiboshProcess process = findProcessObject(mountPath); + process.removeFault(spec); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java new file mode 100644 index 0000000..140abf1 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultController.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.trogdor.task.TaskController; + +import java.util.Set; + +public class KiboshFaultController implements TaskController { + private final Set<String> nodeNames; + + public KiboshFaultController(Set<String> nodeNames) { + this.nodeNames = nodeNames; + } + + @Override + public Set<String> targetNodes(Topology topology) { + return nodeNames; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java new file mode 100644 index 0000000..629d15e --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec; +import org.apache.kafka.trogdor.task.TaskWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +public class KiboshFaultWorker implements TaskWorker { + private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class); + + private final String id; + + private final KiboshFaultSpec spec; + + private final String mountPath; + + public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) { + this.id = id; + this.spec = spec; + this.mountPath = mountPath; + } + + @Override + public void start(Platform platform, AtomicReference<String> status, + KafkaFutureImpl<String> errorFuture) throws Exception { + log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec); + Kibosh.INSTANCE.addFault(mountPath, spec); + } + + @Override + public void stop(Platform platform) throws Exception { + log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec); + Kibosh.INSTANCE.removeFault(mountPath, spec); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d9cbc6b1/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index 342fefc..b5fa001 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -20,12 +20,18 @@ package org.apache.kafka.trogdor.agent; import org.apache.kafka.common.utils.MockScheduler; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; import org.apache.kafka.trogdor.basic.BasicNode; import org.apache.kafka.trogdor.basic.BasicPlatform; import org.apache.kafka.trogdor.basic.BasicTopology; import org.apache.kafka.trogdor.common.ExpectedTasks; import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder; import org.apache.kafka.trogdor.common.Node; +import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec; +import org.apache.kafka.trogdor.fault.Kibosh; +import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile; +import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; @@ -36,10 +42,16 @@ import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; import org.apache.kafka.trogdor.task.SampleTaskSpec; +import org.junit.Assert; import org.junit.Rule; import org.junit.rules.Timeout; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.TreeMap; @@ -235,4 +247,70 @@ public class AgentTest { build()). waitFor(client); } + + private static class MockKibosh implements AutoCloseable { + private final File tempDir; + private final Path controlFile; + + MockKibosh() throws IOException { + tempDir = TestUtils.tempDirectory(); + controlFile = Paths.get(tempDir.toPath().toString(), Kibosh.KIBOSH_CONTROL); + KiboshControlFile.EMPTY.write(controlFile); + } + + KiboshControlFile read() throws IOException { + return KiboshControlFile.read(controlFile); + } + + @Override + public void close() throws Exception { + Utils.delete(tempDir); + } + } + + @Test + public void testKiboshFaults() throws Exception { + MockTime time = new MockTime(0, 0, 0); + MockScheduler scheduler = new MockScheduler(time); + Agent agent = createAgent(scheduler); + AgentClient client = new AgentClient(10, "localhost", agent.port()); + new ExpectedTasks().waitFor(client); + + try (MockKibosh mockKibosh = new MockKibosh()) { + Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read()); + FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000, + Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123); + client.createWorker(new CreateWorkerRequest("foo", fooSpec)); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + workerState(new WorkerRunning(fooSpec, 0, "")). + build()). + waitFor(client); + Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList( + new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read()); + FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000, + Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456); + client.createWorker(new CreateWorkerRequest("bar", barSpec)); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + workerState(new WorkerRunning(fooSpec, 0, "")).build()). + addTask(new ExpectedTaskBuilder("bar"). + workerState(new WorkerRunning(barSpec, 0, "")).build()). + waitFor(client); + Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{ + add(new KiboshFilesUnreadableFaultSpec("/foo", 123)); + add(new KiboshFilesUnreadableFaultSpec("/bar", 456)); + }}), mockKibosh.read()); + time.sleep(1); + client.stopWorker(new StopWorkerRequest("foo")); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()). + addTask(new ExpectedTaskBuilder("bar"). + workerState(new WorkerRunning(barSpec, 0, "")).build()). + waitFor(client); + Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList( + new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read()); + } + } };
