http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java new file mode 100644 index 0000000..9f1a19a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.Topology; + +import java.util.Set; + +public interface Fault { + /** + * Get the ID of this fault. + */ + String id(); + + /** + * Get the specification for this Fault. + */ + FaultSpec spec(); + + /** + * Activate the fault. + */ + void activate(Platform platform) throws Exception; + + /** + * Deactivate the fault. + */ + void deactivate(Platform platform) throws Exception; + + /** + * Get the nodes which this fault is targetting. + */ + Set<String> targetNodes(Topology topology); +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java new file mode 100644 index 0000000..63e5ff4 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +public class FaultSet { + private final static long NS_PER_MS = 1000000L; + + /** + * Maps fault start times in nanoseconds to faults. + */ + private final TreeMap<Long, Fault> byStart = new TreeMap<Long, Fault>(); + + /** + * Maps fault end times in nanoseconds to faults. + */ + private final TreeMap<Long, Fault> byEnd = new TreeMap<Long, Fault>(); + + /** + * Return an iterator that iterates over the fault set in start time order. + */ + public FaultSetIterator iterateByStart() { + return new FaultSetIterator(byStart); + } + + /** + * Return an iterator that iterates over the fault set in end time order. + */ + public FaultSetIterator iterateByEnd() { + return new FaultSetIterator(byEnd); + } + + /** + * Add a new fault to the FaultSet. + */ + public void add(Fault fault) { + insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault); + long endMs = fault.spec().startMs() + fault.spec().durationMs(); + insertUnique(byEnd, endMs * NS_PER_MS, fault); + } + + /** + * Insert a new fault to a TreeMap. + * + * If there is already a fault with the given key, the fault will be stored + * with the next available key. + */ + private void insertUnique(TreeMap<Long, Fault> map, long key, Fault fault) { + while (true) { + Fault existing = map.get(key); + if (existing == null) { + map.put(key, fault); + return; + } else if (existing == fault) { + return; + } else { + key++; + } + } + } + + /** + * Remove a fault from the TreeMap. The fault is removed by object equality. + */ + public void remove(Fault fault) { + removeUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault); + long endMs = fault.spec().startMs() + fault.spec().durationMs(); + removeUnique(byEnd, endMs * NS_PER_MS, fault); + } + + /** + * Helper function to remove a fault from a map. We will search every + * element of the map equal to or higher than the given key. + */ + private void removeUnique(TreeMap<Long, Fault> map, long key, Fault fault) { + while (true) { + Map.Entry<Long, Fault> existing = map.ceilingEntry(key); + if (existing == null) { + throw new NoSuchElementException("No such element as " + fault); + } else if (existing.getValue() == fault) { + map.remove(existing.getKey()); + return; + } else { + key = existing.getKey() + 1; + } + } + } + + /** + * An iterator over the FaultSet. + */ + class FaultSetIterator implements Iterator<Fault> { + private final TreeMap<Long, Fault> map; + private Fault cur = null; + private long prevKey = -1; + + FaultSetIterator(TreeMap<Long, Fault> map) { + this.map = map; + } + + @Override + public boolean hasNext() { + Map.Entry<Long, Fault> entry = map.higherEntry(prevKey); + return entry != null; + } + + @Override + public Fault next() { + Map.Entry<Long, Fault> entry = map.higherEntry(prevKey); + if (entry == null) { + throw new NoSuchElementException(); + } + prevKey = entry.getKey(); + cur = entry.getValue(); + return cur; + } + + @Override + public void remove() { + if (cur == null) { + throw new IllegalStateException(); + } + FaultSet.this.remove(cur); + cur = null; + } + } +}; http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java new file mode 100644 index 0000000..e15c4e9 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.kafka.common.utils.Utils; + + +/** + * The specification for a fault. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, + include = JsonTypeInfo.As.PROPERTY, + property = "class") +public interface FaultSpec { + class Util { + private static final String SPEC_STRING = "Spec"; + + public static Fault createFault(String faultId, FaultSpec faultSpec) throws ClassNotFoundException { + String faultSpecClassName = faultSpec.getClass().getName(); + if (!faultSpecClassName.endsWith(SPEC_STRING)) { + throw new RuntimeException("FaultSpec class name must end with " + SPEC_STRING); + } + String faultClassName = faultSpecClassName.substring(0, + faultSpecClassName.length() - SPEC_STRING.length()); + return Utils.newParameterizedInstance(faultClassName, + String.class, faultId, + FaultSpec.class, faultSpec); + } + } + + /** + * Get the start time of this fault in ms. + */ + @JsonProperty + long startMs(); + + /** + * Get the duration of this fault in ms. + */ + @JsonProperty + long durationMs(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java new file mode 100644 index 0000000..bec0792 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonFormat; + +@JsonFormat(shape = JsonFormat.Shape.STRING) +public enum FaultState { + PENDING, + RUNNING, + DONE +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java new file mode 100644 index 0000000..7524af1 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Node; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +public class NetworkPartitionFault implements Fault { + private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class); + + private final String id; + private final NetworkPartitionFaultSpec spec; + private final List<Set<String>> partitions; + + public NetworkPartitionFault(String id, FaultSpec spec) { + this.id = id; + this.spec = (NetworkPartitionFaultSpec) spec; + this.partitions = new ArrayList<>(); + HashSet<String> prevNodes = new HashSet<>(); + for (List<String> partition : this.spec.partitions()) { + for (String nodeName : partition) { + if (prevNodes.contains(nodeName)) { + throw new RuntimeException("Node " + nodeName + + " appears in more than one partition."); + } + prevNodes.add(nodeName); + this.partitions.add(new HashSet<String>(partition)); + } + } + } + + @Override + public String id() { + return id; + } + + @Override + public FaultSpec spec() { + return spec; + } + + @Override + public void activate(Platform platform) throws Exception { + log.info("Activating NetworkPartitionFault..."); + runIptablesCommands(platform, "-A"); + } + + @Override + public void deactivate(Platform platform) throws Exception { + log.info("Deactivating NetworkPartitionFault..."); + runIptablesCommands(platform, "-D"); + } + + private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception { + Node curNode = platform.curNode(); + Topology topology = platform.topology(); + TreeSet<String> toBlock = new TreeSet<>(); + for (Set<String> partition : partitions) { + if (!partition.contains(curNode.name())) { + for (String nodeName : partition) { + toBlock.add(nodeName); + } + } + } + for (String nodeName : toBlock) { + Node node = topology.node(nodeName); + InetAddress addr = InetAddress.getByName(node.hostname()); + platform.runCommand(new String[] { + "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s", + addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName + }); + } + } + + @Override + public Set<String> targetNodes(Topology topology) { + Set<String> targetNodes = new HashSet<>(); + for (Set<String> partition : partitions) { + targetNodes.addAll(partition); + } + return targetNodes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NetworkPartitionFault that = (NetworkPartitionFault) o; + return Objects.equals(id, that.id) && + Objects.equals(spec, that.spec) && + Objects.equals(partitions, that.partitions); + } + + @Override + public int hashCode() { + return Objects.hash(id, spec, partitions); + } + + @Override + public String toString() { + return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java new file mode 100644 index 0000000..d734dce --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.List; +import java.util.Objects; + +/** + * The specification for a fault that creates a network partition. + */ +public class NetworkPartitionFaultSpec extends AbstractFaultSpec { + private final List<List<String>> partitions; + + @JsonCreator + public NetworkPartitionFaultSpec(@JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs, + @JsonProperty("partitions") List<List<String>> partitions) { + super(startMs, durationMs); + this.partitions = partitions; + } + + @JsonProperty + public List<List<String>> partitions() { + return partitions; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NetworkPartitionFaultSpec that = (NetworkPartitionFaultSpec) o; + return Objects.equals(startMs(), that.startMs()) && + Objects.equals(durationMs(), that.durationMs()) && + Objects.equals(partitions, that.partitions); + } + + @Override + public int hashCode() { + return Objects.hash(startMs(), durationMs(), partitions); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java new file mode 100644 index 0000000..c7ac4de --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Node; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class NoOpFault implements Fault { + private static final Logger log = LoggerFactory.getLogger(NoOpFault.class); + + private final String id; + private final FaultSpec spec; + + public NoOpFault(String id, FaultSpec spec) { + this.id = id; + this.spec = spec; + } + + @Override + public String id() { + return id; + } + + @Override + public FaultSpec spec() { + return spec; + } + + @Override + public void activate(Platform platform) { + log.info("Activating NoOpFault..."); + } + + @Override + public void deactivate(Platform platform) { + log.info("Deactivating NoOpFault..."); + } + + @Override + public Set<String> targetNodes(Topology topology) { + Set<String> set = new HashSet<>(); + for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) { + if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) { + set.add(entry.getKey()); + } + } + return set; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NoOpFault that = (NoOpFault) o; + return Objects.equals(id, that.id) && + Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(id, spec); + } + + @Override + public String toString() { + return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java new file mode 100644 index 0000000..1d4b94d --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * The specification for a fault that does nothing. + * + * This fault type exists mainly to test the fault injection system. + */ +public class NoOpFaultSpec extends AbstractFaultSpec { + @JsonCreator + public NoOpFaultSpec(@JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs) { + super(startMs, durationMs); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NoOpFaultSpec that = (NoOpFaultSpec) o; + return Objects.equals(startMs(), that.startMs()) && + Objects.equals(durationMs(), that.durationMs()); + } + + @Override + public int hashCode() { + return Objects.hash(startMs(), durationMs()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java new file mode 100644 index 0000000..a1b5246 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.Map; + +/** + * Response to GET /faults + */ +public class AgentFaultsResponse extends FaultDataMap { + @JsonCreator + public AgentFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) { + super(faults); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AgentFaultsResponse that = (AgentFaultsResponse) o; + return super.equals(that); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java new file mode 100644 index 0000000..8e32f87 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.Objects; + +/** + * The status of the Trogdor agent. + */ +public class AgentStatusResponse { + private final long startTimeMs; + + @JsonCreator + public AgentStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) { + this.startTimeMs = startTimeMs; + } + + @JsonProperty + public long startTimeMs() { + return startTimeMs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AgentStatusResponse that = (AgentStatusResponse) o; + return Objects.equals(startTimeMs, that.startTimeMs); + } + + @Override + public int hashCode() { + return Objects.hash(startTimeMs); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java new file mode 100644 index 0000000..df26274 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.Map; + +/** + * Response to GET /faults + */ +public class CoordinatorFaultsResponse extends FaultDataMap { + @JsonCreator + public CoordinatorFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) { + super(faults); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CoordinatorFaultsResponse that = (CoordinatorFaultsResponse) o; + return super.equals(that); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java new file mode 100644 index 0000000..348e310 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.Objects; + +/** + * The status of the Trogdor coordinator. + */ +public class CoordinatorStatusResponse { + private final long startTimeMs; + + @JsonCreator + public CoordinatorStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) { + this.startTimeMs = startTimeMs; + } + + @JsonProperty + public long startTimeMs() { + return startTimeMs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CoordinatorStatusResponse that = (CoordinatorStatusResponse) o; + return Objects.equals(startTimeMs, that.startTimeMs); + } + + @Override + public int hashCode() { + return Objects.hash(startTimeMs); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java new file mode 100644 index 0000000..6e772d9 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.fault.FaultSpec; + +import java.util.Objects; + +/** + * A request to the Trogdor agent to create a fault. + */ +public class CreateAgentFaultRequest { + private final String id; + private final FaultSpec spec; + + @JsonCreator + public CreateAgentFaultRequest(@JsonProperty("id") String id, + @JsonProperty("spec") FaultSpec spec) { + this.id = id; + this.spec = spec; + } + + @JsonProperty + public String id() { + return id; + } + + @JsonProperty + public FaultSpec spec() { + return spec; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CreateAgentFaultRequest that = (CreateAgentFaultRequest) o; + return Objects.equals(id, that.id) && + Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(id, spec); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java new file mode 100644 index 0000000..ec00cf3 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.fault.FaultSpec; + +import java.util.Objects; + +/** + * A request to the Trogdor coordinator to create a fault. + */ +public class CreateCoordinatorFaultRequest { + private final String id; + private final FaultSpec spec; + + @JsonCreator + public CreateCoordinatorFaultRequest(@JsonProperty("id") String id, + @JsonProperty("spec") FaultSpec spec) { + this.id = id; + this.spec = spec; + } + + @JsonProperty + public String id() { + return id; + } + + @JsonProperty + public FaultSpec spec() { + return spec; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CreateCoordinatorFaultRequest that = (CreateCoordinatorFaultRequest) o; + return Objects.equals(id, that.id) && + Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(id, spec); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java new file mode 100644 index 0000000..da2fcba --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.kafka.trogdor.common.JsonUtil; + +/** + * An empty request or response. + */ +public class Empty { + public static final Empty INSTANCE = new Empty(); + + @JsonCreator + public Empty() { + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return true; + } + + @Override + public int hashCode() { + return 1; + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java new file mode 100644 index 0000000..08bf6cd --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; + +import java.util.Objects; + +/** + * An error response. + */ +public class ErrorResponse { + private final int code; + private final String message; + + @JsonCreator + public ErrorResponse(@JsonProperty("code") int code, + @JsonProperty("message") String message) { + this.code = code; + this.message = message; + } + + @JsonProperty + public int code() { + return code; + } + + @JsonProperty + public String message() { + return message; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ErrorResponse that = (ErrorResponse) o; + return Objects.equals(code, that.code) && + Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(code, message); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java new file mode 100644 index 0000000..773d519 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.fault.FaultSpec; +import org.apache.kafka.trogdor.fault.FaultState; + +import java.util.Map; +import java.util.Objects; + +/** + * Response to GET /faults + */ +public class FaultDataMap { + private final Map<String, FaultData> faults; + + public static class FaultData { + private final FaultSpec spec; + private final FaultState state; + + @JsonCreator + public FaultData(@JsonProperty("spec") FaultSpec spec, + @JsonProperty("status") FaultState state) { + this.spec = spec; + this.state = state; + } + + @JsonProperty + public FaultSpec spec() { + return spec; + } + + @JsonProperty + public FaultState state() { + return state; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FaultData that = (FaultData) o; + return Objects.equals(spec, that.spec) && + Objects.equals(state, that.state); + } + + @Override + public int hashCode() { + return Objects.hash(spec, state); + } + } + + @JsonCreator + public FaultDataMap(@JsonProperty("faults") Map<String, FaultData> faults) { + this.faults = faults; + } + + @JsonProperty + public Map<String, FaultData> faults() { + return faults; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FaultDataMap that = (FaultDataMap) o; + return Objects.equals(faults, that.faults); + } + + @Override + public int hashCode() { + return Objects.hashCode(faults); + } + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java new file mode 100644 index 0000000..1b23a9e --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.kafka.trogdor.common.JsonUtil; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.Slf4jRequestLog; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.server.handler.RequestLogHandler; +import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +/** + * Embedded server for the REST API that provides the control plane for Trogdor. + */ +public class JsonRestServer { + private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class); + + private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 2 * 1000; + + private final Server jettyServer; + + private final ServerConnector connector; + + /** + * Create a REST server for this herder using the specified configs. + * + * @param port The port number to use for the REST server, or + * 0 to use a random port. + */ + public JsonRestServer(int port) { + this.jettyServer = new Server(); + this.connector = new ServerConnector(jettyServer); + if (port > 0) { + connector.setPort(port); + } + jettyServer.setConnectors(new Connector[]{connector}); + } + + /** + * Start the JsonRestServer. + * + * @param resources The path handling resources to register. + */ + public void start(Object... resources) { + log.info("Starting REST server"); + + ResourceConfig resourceConfig = new ResourceConfig(); + resourceConfig.register(new JacksonJsonProvider(JsonUtil.JSON_SERDE)); + for (Object resource : resources) { + resourceConfig.register(resource); + log.info("Registered resource {}", resource); + } + resourceConfig.register(RestExceptionMapper.class); + ServletContainer servletContainer = new ServletContainer(resourceConfig); + ServletHolder servletHolder = new ServletHolder(servletContainer); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + context.addServlet(servletHolder, "/*"); + + RequestLogHandler requestLogHandler = new RequestLogHandler(); + Slf4jRequestLog requestLog = new Slf4jRequestLog(); + requestLog.setLoggerName(JsonRestServer.class.getCanonicalName()); + requestLog.setLogLatency(true); + requestLogHandler.setRequestLog(requestLog); + + HandlerCollection handlers = new HandlerCollection(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler}); + StatisticsHandler statsHandler = new StatisticsHandler(); + statsHandler.setHandler(handlers); + jettyServer.setHandler(statsHandler); + /* Needed for graceful shutdown as per `setStopTimeout` documentation */ + jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS); + jettyServer.setStopAtShutdown(true); + + try { + jettyServer.start(); + } catch (Exception e) { + throw new RuntimeException("Unable to start REST server", e); + } + log.info("REST server listening at " + jettyServer.getURI()); + } + + public int port() { + return connector.getLocalPort(); + } + + public void stop() { + log.info("Stopping REST server"); + + try { + jettyServer.stop(); + jettyServer.join(); + log.info("REST server stopped"); + } catch (Exception e) { + log.error("Unable to stop REST server", e); + } finally { + jettyServer.destroy(); + } + } + + /** + * @param url HTTP connection will be established with this url. + * @param method HTTP method ("GET", "POST", "PUT", etc.) + * @param requestBodyData Object to serialize as JSON and send in the request body. + * @param responseFormat Expected format of the response to the HTTP request. + * @param <T> The type of the deserialized response to the HTTP request. + * @return The deserialized response to the HTTP request, or null if no data is expected. + */ + public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData, + TypeReference<T> responseFormat) throws IOException { + HttpURLConnection connection = null; + try { + String serializedBody = requestBodyData == null ? null : + JsonUtil.JSON_SERDE.writeValueAsString(requestBodyData); + log.debug("Sending {} with input {} to {}", method, serializedBody, url); + connection = (HttpURLConnection) new URL(url).openConnection(); + connection.setRequestMethod(method); + connection.setRequestProperty("User-Agent", "kafka"); + connection.setRequestProperty("Accept", "application/json"); + + // connection.getResponseCode() implicitly calls getInputStream, so always set + // this to true. + connection.setDoInput(true); + + connection.setUseCaches(false); + + if (requestBodyData != null) { + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); + + OutputStream os = connection.getOutputStream(); + os.write(serializedBody.getBytes(StandardCharsets.UTF_8)); + os.flush(); + os.close(); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) { + return new HttpResponse<>(null, new ErrorResponse(responseCode, connection.getResponseMessage())); + } else if ((responseCode >= 200) && (responseCode < 300)) { + InputStream is = connection.getInputStream(); + T result = JsonUtil.JSON_SERDE.readValue(is, responseFormat); + is.close(); + return new HttpResponse<>(result, null); + } else { + // If the resposne code was not in the 200s, we assume that this is an error + // response. + InputStream es = connection.getErrorStream(); + if (es == null) { + // Handle the case where HttpURLConnection#getErrorStream returns null. + return new HttpResponse<>(null, new ErrorResponse(responseCode, "")); + } + // Try to read the error response JSON. + ErrorResponse error = JsonUtil.JSON_SERDE.readValue(es, ErrorResponse.class); + es.close(); + return new HttpResponse<>(null, error); + } + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + public static class HttpResponse<T> { + private final T body; + private final ErrorResponse error; + + HttpResponse(T body, ErrorResponse error) { + this.body = body; + this.error = error; + } + + public T body() throws Exception { + if (error != null) { + throw RestExceptionMapper.toException(error.code(), error.message()); + } + return body; + } + + public ErrorResponse error() { + return error; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java new file mode 100644 index 0000000..b063a9a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.rest; + +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import org.apache.kafka.common.errors.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; + +public class RestExceptionMapper implements ExceptionMapper<Throwable> { + private static final Logger log = LoggerFactory.getLogger(RestExceptionMapper.class); + + @Override + public Response toResponse(Throwable e) { + if (log.isDebugEnabled()) { + log.debug("Uncaught exception in REST call: ", e); + } else if (log.isInfoEnabled()) { + log.info("Uncaught exception in REST call: {}", e.getMessage()); + } + if (e instanceof NotFoundException) { + return buildResponse(Response.Status.NOT_FOUND, e); + } else if (e instanceof JsonMappingException) { + return buildResponse(Response.Status.BAD_REQUEST, e); + } else if (e instanceof ClassNotFoundException) { + return buildResponse(Response.Status.NOT_IMPLEMENTED, e); + } else if (e instanceof InvalidTypeIdException) { + return buildResponse(Response.Status.NOT_IMPLEMENTED, e); + } else if (e instanceof SerializationException) { + return buildResponse(Response.Status.BAD_REQUEST, e); + } else { + return buildResponse(Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + public static Exception toException(int code, String msg) throws Exception { + if (code == Response.Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(msg); + } else if (code == Response.Status.NOT_IMPLEMENTED.getStatusCode()) { + throw new ClassNotFoundException(msg); + } else if (code == Response.Status.BAD_REQUEST.getStatusCode()) { + throw new SerializationException(msg); + } else { + throw new RuntimeException(msg); + } + } + + private Response buildResponse(Response.Status code, Throwable e) { + return Response.status(code). + entity(new ErrorResponse(code.getStatusCode(), e.getMessage())). + build(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java new file mode 100644 index 0000000..c587e44 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.agent; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.trogdor.basic.BasicNode; +import org.apache.kafka.trogdor.basic.BasicPlatform; +import org.apache.kafka.trogdor.basic.BasicTopology; +import org.apache.kafka.trogdor.common.ExpectedFaults; +import org.apache.kafka.trogdor.common.Node; +import org.apache.kafka.trogdor.fault.FaultState; +import org.apache.kafka.trogdor.fault.NoOpFaultSpec; +import org.apache.kafka.trogdor.rest.AgentFaultsResponse; +import org.apache.kafka.trogdor.rest.AgentStatusResponse; +import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; + +import org.apache.kafka.trogdor.rest.JsonRestServer; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.TreeMap; + +import static org.junit.Assert.assertEquals; + +public class AgentTest { + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); + + private static BasicPlatform createBasicPlatform() { + TreeMap<String, Node> nodes = new TreeMap<>(); + HashMap<String, String> config = new HashMap<>(); + nodes.put("node01", new BasicNode("node01", "localhost", + config, Collections.<String>emptySet())); + BasicTopology topology = new BasicTopology(nodes); + return new BasicPlatform("node01", topology, new BasicPlatform.ShellCommandRunner()); + } + + private Agent createAgent(Time time) { + JsonRestServer restServer = new JsonRestServer(0); + AgentRestResource resource = new AgentRestResource(); + restServer.start(resource); + return new Agent(createBasicPlatform(), time, restServer, resource); + } + + @Test + public void testAgentStartShutdown() throws Exception { + Agent agent = createAgent(Time.SYSTEM); + agent.beginShutdown(); + agent.waitForShutdown(); + } + + @Test + public void testAgentProgrammaticShutdown() throws Exception { + Agent agent = createAgent(Time.SYSTEM); + AgentClient client = new AgentClient("localhost", agent.port()); + client.invokeShutdown(); + agent.waitForShutdown(); + } + + @Test + public void testAgentGetStatus() throws Exception { + Agent agent = createAgent(Time.SYSTEM); + AgentClient client = new AgentClient("localhost", agent.port()); + AgentStatusResponse status = client.getStatus(); + assertEquals(agent.startTimeMs(), status.startTimeMs()); + agent.beginShutdown(); + agent.waitForShutdown(); + } + + @Test + public void testAgentCreateFaults() throws Exception { + Time time = new MockTime(0, 0, 0); + Agent agent = createAgent(time); + AgentClient client = new AgentClient("localhost", agent.port()); + AgentFaultsResponse faults = client.getFaults(); + assertEquals(Collections.emptyMap(), faults.faults()); + new ExpectedFaults().waitFor(client); + + final NoOpFaultSpec fooSpec = new NoOpFaultSpec(1000, 600000); + client.putFault(new CreateAgentFaultRequest("foo", fooSpec)); + new ExpectedFaults().addFault("foo", fooSpec).waitFor(client); + + final NoOpFaultSpec barSpec = new NoOpFaultSpec(2000, 900000); + client.putFault(new CreateAgentFaultRequest("bar", barSpec)); + new ExpectedFaults(). + addFault("foo", fooSpec). + addFault("bar", barSpec). + waitFor(client); + + final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 450000); + client.putFault(new CreateAgentFaultRequest("baz", bazSpec)); + new ExpectedFaults(). + addFault("foo", fooSpec). + addFault("bar", barSpec). + addFault("baz", bazSpec). + waitFor(client); + + agent.beginShutdown(); + agent.waitForShutdown(); + } + + @Test + public void testAgentActivatesFaults() throws Exception { + Time time = new MockTime(0, 0, 0); + Agent agent = createAgent(time); + AgentClient client = new AgentClient("localhost", agent.port()); + AgentFaultsResponse faults = client.getFaults(); + assertEquals(Collections.emptyMap(), faults.faults()); + new ExpectedFaults().waitFor(client); + + final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2); + client.putFault(new CreateAgentFaultRequest("foo", fooSpec)); + new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client); + + final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3); + client.putFault(new CreateAgentFaultRequest("bar", barSpec)); + time.sleep(11); + new ExpectedFaults(). + addFault("foo", FaultState.RUNNING). + addFault("bar", FaultState.RUNNING). + waitFor(client); + + final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11); + client.putFault(new CreateAgentFaultRequest("baz", bazSpec)); + new ExpectedFaults(). + addFault("foo", FaultState.RUNNING). + addFault("bar", FaultState.RUNNING). + addFault("baz", FaultState.RUNNING). + waitFor(client); + + time.sleep(2); + new ExpectedFaults(). + addFault("foo", FaultState.DONE). + addFault("bar", FaultState.RUNNING). + addFault("baz", FaultState.DONE). + waitFor(client); + + time.sleep(100); + new ExpectedFaults(). + addFault("foo", FaultState.DONE). + addFault("bar", FaultState.DONE). + addFault("baz", FaultState.DONE). + waitFor(client); + + agent.beginShutdown(); + agent.waitForShutdown(); + } +}; http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java b/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java new file mode 100644 index 0000000..c0dd680 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.basic; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.trogdor.common.Platform; + +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; + +public class BasicPlatformTest { + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); + + @Test + public void testCreateBasicPlatform() throws Exception { + File configFile = TestUtils.tempFile(); + try { + try (OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(configFile), + StandardCharsets.UTF_8)) { + writer.write("{\n"); + writer.write(" \"platform\": \"org.apache.kafka.trogdor.basic.BasicPlatform\",\n"); + writer.write(" \"nodes\": {\n"); + writer.write(" \"bob01\": {\n"); + writer.write(" \"hostname\": \"localhost\",\n"); + writer.write(" \"trogdor.agent.port\": 8888\n"); + writer.write(" },\n"); + writer.write(" \"bob02\": {\n"); + writer.write(" \"hostname\": \"localhost\",\n"); + writer.write(" \"trogdor.agent.port\": 8889\n"); + writer.write(" }\n"); + writer.write(" }\n"); + writer.write("}\n"); + } + Platform platform = Platform.Config.parse("bob01", configFile.getPath()); + assertEquals("BasicPlatform", platform.name()); + assertEquals(2, platform.topology().nodes().size()); + assertEquals("bob01, bob02", Utils.join(platform.topology().nodes().keySet(), ", ")); + } finally { + Files.delete(configFile.toPath()); + } + } +}; http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java b/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java new file mode 100644 index 0000000..2e5a660 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.trogdor.basic.BasicPlatform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class CapturingCommandRunner implements BasicPlatform.CommandRunner { + private static final Logger log = LoggerFactory.getLogger(CapturingCommandRunner.class); + + private final Map<String, List<String>> commands = new HashMap<>(); + + private synchronized List<String> getOrCreate(String nodeName) { + List<String> lines = commands.get(nodeName); + if (lines != null) { + return lines; + } + lines = new LinkedList<>(); + commands.put(nodeName, lines); + return lines; + } + + @Override + public String run(Node curNode, String[] command) throws IOException { + String line = Utils.join(command, " "); + synchronized (this) { + getOrCreate(curNode.name()).add(line); + } + log.debug("RAN {}: {}", curNode, Utils.join(command, " ")); + return ""; + } + + public synchronized List<String> lines(String nodeName) { + return new ArrayList<String>(getOrCreate(nodeName)); + } +}; http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java new file mode 100644 index 0000000..1fab903 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.trogdor.agent.AgentClient; +import org.apache.kafka.trogdor.coordinator.CoordinatorClient; +import org.apache.kafka.trogdor.fault.FaultSpec; +import org.apache.kafka.trogdor.fault.FaultState; +import org.apache.kafka.trogdor.rest.AgentFaultsResponse; +import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.TreeMap; + +public class ExpectedFaults { + private static final Logger log = LoggerFactory.getLogger(ExpectedFaults.class); + + private static class FaultData { + final FaultSpec spec; + final FaultState state; + + FaultData(FaultSpec spec, FaultState state) { + this.spec = spec; + this.state = state; + } + } + + private interface FaultFetcher { + TreeMap<String, FaultData> fetch() throws Exception; + } + + private static class AgentFaultFetcher implements FaultFetcher { + private final AgentClient client; + + AgentFaultFetcher(AgentClient client) { + this.client = client; + } + + @Override + public TreeMap<String, FaultData> fetch() throws Exception { + TreeMap<String, FaultData> results = new TreeMap<>(); + AgentFaultsResponse response = client.getFaults(); + for (Map.Entry<String, AgentFaultsResponse.FaultData> entry : + response.faults().entrySet()) { + results.put(entry.getKey(), + new FaultData(entry.getValue().spec(), entry.getValue().state())); + } + return results; + } + } + + private static class CoordinatorFaultFetcher implements FaultFetcher { + private final CoordinatorClient client; + + CoordinatorFaultFetcher(CoordinatorClient client) { + this.client = client; + } + + @Override + public TreeMap<String, FaultData> fetch() throws Exception { + TreeMap<String, FaultData> results = new TreeMap<>(); + CoordinatorFaultsResponse response = client.getFaults(); + for (Map.Entry<String, CoordinatorFaultsResponse.FaultData> entry : + response.faults().entrySet()) { + results.put(entry.getKey(), + new FaultData(entry.getValue().spec(), entry.getValue().state())); + } + return results; + } + } + + private final TreeMap<String, FaultData> expected = new TreeMap<String, FaultData>(); + + public ExpectedFaults addFault(String id, FaultSpec spec) { + expected.put(id, new FaultData(spec, null)); + return this; + } + + public ExpectedFaults addFault(String id, FaultState state) { + expected.put(id, new FaultData(null, state)); + return this; + } + + public ExpectedFaults addFault(String id, FaultSpec spec, FaultState state) { + expected.put(id, new FaultData(spec, state)); + return this; + } + + public ExpectedFaults waitFor(AgentClient agentClient) throws InterruptedException { + waitFor(new AgentFaultFetcher(agentClient)); + return this; + } + + public ExpectedFaults waitFor(CoordinatorClient client) throws InterruptedException { + waitFor(new CoordinatorFaultFetcher(client)); + return this; + } + + private void waitFor(final FaultFetcher faultFetcher) throws InterruptedException { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + TreeMap<String, FaultData> curData = null; + try { + curData = faultFetcher.fetch(); + } catch (Exception e) { + log.info("Got error fetching faults", e); + throw new RuntimeException(e); + } + StringBuilder errors = new StringBuilder(); + for (Map.Entry<String, FaultData> entry : expected.entrySet()) { + String id = entry.getKey(); + FaultData expectedFaultData = entry.getValue(); + FaultData curFaultData = curData.get(id); + if (curFaultData == null) { + errors.append("Did not find fault id " + id + "\n"); + } else { + if (expectedFaultData.spec != null) { + if (!expectedFaultData.spec.equals(curFaultData.spec)) { + errors.append("For fault id " + id + ", expected fault " + + "spec " + expectedFaultData.spec + ", but got " + + curFaultData.spec + "\n"); + } + } + if (expectedFaultData.state != null) { + if (!expectedFaultData.state.equals(curFaultData.state)) { + errors.append("For fault id " + id + ", expected fault " + + "state " + expectedFaultData.state + ", but got " + + curFaultData.state + "\n"); + } + } + } + } + for (String id : curData.keySet()) { + if (expected.get(id) == null) { + errors.append("Got unexpected fault id " + id + "\n"); + } + } + String errorString = errors.toString(); + if (!errorString.isEmpty()) { + log.info("EXPECTED FAULTS: {}", faultsToString(expected)); + log.info("ACTUAL FAULTS : {}", faultsToString(curData)); + log.info(errorString); + return false; + } + return true; + } + }, "Timed out waiting for expected fault specs " + faultsToString(expected)); + } + + private static String faultsToString(TreeMap<String, FaultData> faults) { + StringBuilder bld = new StringBuilder(); + bld.append("{"); + String faultsPrefix = ""; + for (Map.Entry<String, FaultData> entry : faults.entrySet()) { + String id = entry.getKey(); + bld.append(faultsPrefix).append(id).append(": {"); + faultsPrefix = ", "; + String faultValuesPrefix = ""; + FaultData faultData = entry.getValue(); + if (faultData.spec != null) { + bld.append(faultValuesPrefix).append("spec: ").append(faultData.spec); + faultValuesPrefix = ", "; + } + if (faultData.state != null) { + bld.append(faultValuesPrefix).append("state: ").append(faultData.state); + faultValuesPrefix = ", "; + } + bld.append("}"); + } + bld.append("}"); + return bld.toString(); + } +};