AMBARI-17561. Enable simulating logfeeder inputs (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1a89d84c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1a89d84c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1a89d84c Branch: refs/heads/branch-2.4 Commit: 1a89d84cbddda2db2be960dafcdeb5500ba5b4e3 Parents: b02cdc7 Author: Miklos Gergely <[email protected]> Authored: Wed Jul 6 12:14:21 2016 +0200 Committer: oleewere <[email protected]> Committed: Wed Jul 6 12:26:28 2016 +0200 ---------------------------------------------------------------------- .../org/apache/ambari/logfeeder/LogFeeder.java | 21 ++- .../ambari/logfeeder/input/InputSimulate.java | 158 +++++++++++++++++++ .../src/main/resources/alias_config.json | 105 ++++++------ 3 files changed, 231 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/1a89d84c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 88a6737..8697f54 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -40,6 +40,7 @@ import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM; import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputSimulate; import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.util.FileUtil; @@ -118,6 +119,9 @@ public class LogFeeder { + configFileName); } } + + addSimulatedInputs(); + mergeAllConfigs(); LogfeederScheduler.INSTANCE.start(); @@ -196,7 +200,22 @@ public class LogFeeder { outputConfigList.addAll(mapList); } } - + } + + private void addSimulatedInputs() { + int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0); + if (simulatedInputNumber == 0) + return; + + InputSimulate.loadTypeToFilePath(inputConfigList); + inputConfigList.clear(); + + for (int i = 0; i < simulatedInputNumber; i++) { + HashMap<String, Object> mapList = new HashMap<String, Object>(); + mapList.put("source", "simulate"); + mapList.put("rowtype", "service"); + inputConfigList.add(mapList); + } } private void mergeAllConfigs() { http://git-wip-us.apache.org/repos/asf/ambari/blob/1a89d84c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java new file mode 100644 index 0000000..72a0586 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input; + +import java.net.Inet4Address; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.filter.JSONFilterCode; +import org.apache.commons.lang3.StringUtils; +import org.apache.solr.common.util.Base64; + +public class InputSimulate extends Input { + + private static final String LOG_MESSAGE_PREFIX = "Simulated log message for testing, line"; + + private static final String LOG_TEXT_PATTERN = + "{ logtime=\"%d\", level=\"%s\", log_message=\"<LOG_MESSAGE_PATTERN>\"}"; + + private static final Map<String, String> typeToFilePath = new HashMap<>(); + public static void loadTypeToFilePath(List<Map<String, Object>> inputList) { + for (Map<String, Object> input : inputList) { + if (input.containsKey("type") && input.containsKey("path")) { + typeToFilePath.put((String)input.get("type"), (String)input.get("path")); + } + } + } + + private static final Map<String, Integer> typeToLineNumber = new HashMap<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private final List<String> types; + private final String level; + private final String logText; + private final long sleepMillis; + + public InputSimulate() throws Exception { + this.types = getSimulatedLogTypes(); + this.level = LogFeederUtil.getStringProperty("logfeeder.simulate.log_level", "WARN"); + this.logText = getLogText(); + this.sleepMillis = LogFeederUtil.getIntProperty("logfeeder.simulate.sleep_milliseconds", 10000); + + Filter filter = new JSONFilterCode(); + filter.setInput(this); + setFirstFilter(filter); + } + + private List<String> getSimulatedLogTypes() { + String logsToSimulate = LogFeederUtil.getStringProperty("logfeeder.simulate.log_ids"); + if (logsToSimulate == null) { + return new ArrayList<>(typeToFilePath.keySet()); + } else { + List<String> simulatedLogTypes = Arrays.asList(logsToSimulate.split(",")); + simulatedLogTypes.retainAll(typeToFilePath.keySet()); + return simulatedLogTypes; + } + } + + private String getLogText() { + int logTextSize = LogFeederUtil.getIntProperty("logfeeder.simulate.log_message_size", 100); + int fillerSize = Math.max(logTextSize - LOG_MESSAGE_PREFIX.length() - 10, 0); + String filler = StringUtils.repeat("X", fillerSize); + String logMessagePattern = LOG_MESSAGE_PREFIX + " %08d " + filler; + + return LOG_TEXT_PATTERN.replaceAll("<LOG_MESSAGE_PATTERN>", logMessagePattern); + } + + @Override + public String getNameForThread() { + return "Simulated input"; + } + + @Override + public String getShortDescription() { + return "Simulated input"; + } + + @Override + void start() throws Exception { + if (types.isEmpty()) + return; + + getFirstFilter().setOutputMgr(outputMgr); + while (true) { + String type = imitateRandomLogFile(); + + InputMarker marker = getInputMarker(type); + String line = getLine(marker); + + outputLine(line, marker); + + try { Thread.sleep(sleepMillis); } catch(Exception e) {} + } + } + + private String imitateRandomLogFile() { + int typePos = random.nextInt(types.size()); + String type = types.get(typePos); + String filePath = typeToFilePath.get(type); + + configs.put("type", type); + setFilePath(filePath); + + return type; + } + + private InputMarker getInputMarker(String type) throws Exception { + InputMarker marker = new InputMarker(); + marker.input = this; + marker.lineNumber = getLineNumber(type); + marker.base64FileKey = getBase64FileKey(); + return marker; + } + + private static synchronized int getLineNumber(String type) { + if (!typeToLineNumber.containsKey(type)) { + typeToLineNumber.put(type, 0); + } + Integer lineNumber = typeToLineNumber.get(type) + 1; + + typeToLineNumber.put(type, lineNumber); + return lineNumber; + } + + private String getBase64FileKey() throws Exception { + String fileKey = Inet4Address.getLocalHost().getHostAddress() + "|" + filePath; + return Base64.byteArrayToBase64(fileKey.getBytes()); + } + + private String getLine(InputMarker marker) { + Date d = new Date(); + return String.format(logText, d.getTime(), level, marker.lineNumber); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/1a89d84c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json index 978f581..bc221a0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json @@ -1,54 +1,55 @@ { - "input": { - "file": { - "klass": "org.apache.ambari.logfeeder.input.InputFile" - }, - "s3_file": { - "klass": "org.apache.ambari.logfeeder.input.InputS3File" - } - - }, - "filter": { - "json": { - "klass": "org.apache.ambari.logfeeder.filter.JSONFilterCode" - }, - "keyvalue": { - "klass": "org.apache.ambari.logfeeder.filter.FilterKeyValue" - }, - "grok": { - "klass": "org.apache.ambari.logfeeder.filter.FilterGrok" - } - }, - - "mapper": { - "map_date": { - "klass": "org.apache.ambari.logfeeder.mapper.MapperDate" - }, - "map_fieldname": { - "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldName" - }, - "map_fieldvalue": { - "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldValue" - } - }, - "output": { - "solr": { - "klass": "org.apache.ambari.logfeeder.output.OutputSolr" - }, - "file": { - "klass": "org.apache.ambari.logfeeder.output.OutputFile" - }, - "kafka": { - "klass": "org.apache.ambari.logfeeder.output.OutputKafka" - }, - "dev_null": { - "klass": "org.apache.ambari.logfeeder.output.OutputDevNull" - }, - "s3_file": { - "klass": "org.apache.ambari.logfeeder.output.OutputS3File" - }, - "hdfs": { - "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile" - } - } +"input": { + "file": { + "klass": "org.apache.ambari.logfeeder.input.InputFile" + }, + "s3_file": { + "klass": "org.apache.ambari.logfeeder.input.InputS3File" + }, + "simulate": { + "klass": "org.apache.ambari.logfeeder.input.InputSimulate" + } + }, + "filter": { + "json": { + "klass": "org.apache.ambari.logfeeder.filter.JSONFilterCode" + }, + "keyvalue": { + "klass": "org.apache.ambari.logfeeder.filter.FilterKeyValue" + }, + "grok": { + "klass": "org.apache.ambari.logfeeder.filter.FilterGrok" + } + }, + "mapper": { + "map_date": { + "klass": "org.apache.ambari.logfeeder.mapper.MapperDate" + }, + "map_fieldname": { + "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldName" + }, + "map_fieldvalue": { + "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldValue" + } + }, + "output": { + "solr": { + "klass": "org.apache.ambari.logfeeder.output.OutputSolr" + }, + "file": { + "klass": "org.apache.ambari.logfeeder.output.OutputFile" + }, + "kafka": { + "klass": "org.apache.ambari.logfeeder.output.OutputKafka" + }, + "dev_null": { + "klass": "org.apache.ambari.logfeeder.output.OutputDevNull" + }, + "s3_file": { + "klass": "org.apache.ambari.logfeeder.output.OutputS3File" + }, + "hdfs": { + "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile" + } + } } \ No newline at end of file
