AMBARI-2003 LogFeeder Simulator Enhancements for 3000-node cluster testing (mgergely)
Change-Id: I828c1804b4b118e2535da8c50ae0f4e7fc5798ce Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b68bb74c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b68bb74c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b68bb74c Branch: refs/heads/branch-feature-AMBARI-12556 Commit: b68bb74c47ee4a824d1b9d4fc243db31d185eb41 Parents: 9fdeec1 Author: Miklos Gergely <[email protected]> Authored: Thu Feb 23 15:08:33 2017 +0100 Committer: Miklos Gergely <[email protected]> Committed: Thu Feb 23 15:08:33 2017 +0100 ---------------------------------------------------------------------- .../org/apache/ambari/logfeeder/LogFeeder.java | 13 ++++++-- .../ambari/logfeeder/common/ConfigBlock.java | 2 +- .../apache/ambari/logfeeder/filter/Filter.java | 4 +-- .../ambari/logfeeder/input/InputSimulate.java | 31 ++++++++++++++++++-- 4 files changed, 42 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b68bb74c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 24651ba..d584890 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -245,7 +245,7 @@ public class LogFeeder { // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input if (output.getBooleanValue("is_enabled", true)) { - output.logConfgs(Level.INFO); + output.logConfigs(Level.INFO); outputManager.add(output); } else { LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription()); @@ -277,7 +277,7 @@ public class LogFeeder { input.setOutputManager(outputManager); input.setInputManager(inputManager); inputManager.add(input); - input.logConfgs(Level.INFO); + input.logConfigs(Level.INFO); } else { LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription()); } @@ -311,7 +311,7 @@ public class LogFeeder { if (filter.isEnabled()) { filter.setOutputManager(outputManager); input.addFilter(filter); - filter.logConfgs(Level.INFO); + filter.logConfigs(Level.INFO); } else { LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription()); } @@ -371,6 +371,13 @@ public class LogFeeder { } } } + + // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager + for (Output output : InputSimulate.getSimulateOutputs()) { + outputManager.add(output); + usedOutputSet.add(output); + } + outputManager.retainUsedOutputs(usedOutputSet); } http://git-wip-us.apache.org/repos/asf/ambari/blob/b68bb74c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java index 47ddc51..68897e8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java @@ -223,7 +223,7 @@ public abstract class ConfigBlock { logStatForMetric(statMetric, "Stat"); } - public boolean logConfgs(Priority level) { + public boolean logConfigs(Priority level) { if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) { return false; } http://git-wip-us.apache.org/repos/asf/ambari/blob/b68bb74c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index 684f3c4..afd903e 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -173,8 +173,8 @@ public abstract class Filter extends ConfigBlock { } @Override - public boolean logConfgs(Priority level) { - if (!super.logConfgs(level)) { + public boolean logConfigs(Priority level) { + if (!super.logConfigs(level)) { return false; } LOG.log(level, "input=" + input.getShortDescription()); http://git-wip-us.apache.org/repos/asf/ambari/blob/b68bb74c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index be97a52..2222f93 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -29,17 +29,21 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.filter.FilterJSON; +import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.log4j.Logger; import org.apache.solr.common.util.Base64; import com.google.common.base.Joiner; public class InputSimulate extends Input { + private static final Logger LOG = Logger.getLogger(InputSimulate.class); - private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\"}"; + private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}"; private static final Map<String, String> typeToFilePath = new HashMap<>(); public static void loadTypeToFilePath(List<Map<String, Object>> inputList) { @@ -52,6 +56,13 @@ public class InputSimulate extends Input { private static final Map<String, Integer> typeToLineNumber = new HashMap<>(); + private static final AtomicInteger hostNumber = new AtomicInteger(0); + + private static final List<Output> simulateOutputs = new ArrayList<>(); + public static List<Output> getSimulateOutputs() { + return simulateOutputs; + } + private final Random random = new Random(System.currentTimeMillis()); private final List<String> types; @@ -60,6 +71,7 @@ public class InputSimulate extends Input { private final int minLogWords; private final int maxLogWords; private final long sleepMillis; + private final String host; public InputSimulate() throws Exception { this.types = getSimulatedLogTypes(); @@ -68,6 +80,7 @@ public class InputSimulate extends Input { this.minLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.min_log_words", 5, 1, 10); this.maxLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.max_log_words", 10, 10, 20); this.sleepMillis = LogFeederUtil.getIntProperty("logfeeder.simulate.sleep_milliseconds", 10000); + this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName; Filter filter = new FilterJSON(); filter.loadConfig(Collections.<String, Object> emptyMap()); @@ -87,6 +100,20 @@ public class InputSimulate extends Input { } @Override + public void addOutput(Output output) { + try { + Class<? extends Output> clazz = output.getClass(); + Output outputCopy = clazz.newInstance(); + outputCopy.loadConfig(output.getConfigs()); + simulateOutputs.add(outputCopy); + super.addOutput(outputCopy); + } catch (Exception e) { + LOG.warn("Could not copy Output class " + output.getClass() + ", using original output"); + super.addOutput(output); + } + } + + @Override public boolean isReady() { return true; } @@ -143,7 +170,7 @@ public class InputSimulate extends Input { private String getLine() { Date d = new Date(); String logMessage = createLogMessage(); - return String.format(LOG_TEXT_PATTERN, d.getTime(), level, logMessage); + return String.format(LOG_TEXT_PATTERN, d.getTime(), level, logMessage, host); } private String createLogMessage() {
