AMBARI-18236. Fix package structure in Logfeeder (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/080c1ba9 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/080c1ba9 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/080c1ba9 Branch: refs/heads/branch-dev-logsearch Commit: 080c1ba9cee8257478ad9bc1778d35f7fd400c49 Parents: ffcf532 Author: Miklos Gergely <[email protected]> Authored: Tue Aug 23 17:55:15 2016 +0200 Committer: oleewere <[email protected]> Committed: Tue Aug 23 17:55:15 2016 +0200 ---------------------------------------------------------------------- .../org/apache/ambari/logfeeder/AliasUtil.java | 99 ---- .../apache/ambari/logfeeder/ConfigBlock.java | 260 --------- .../org/apache/ambari/logfeeder/InputMgr.java | 451 --------------- .../org/apache/ambari/logfeeder/LogFeeder.java | 10 +- .../ambari/logfeeder/LogFeederAMSClient.java | 80 --- .../apache/ambari/logfeeder/LogFeederUtil.java | 556 ------------------ .../apache/ambari/logfeeder/MetricCount.java | 31 -- .../org/apache/ambari/logfeeder/MetricsMgr.java | 177 ------ .../org/apache/ambari/logfeeder/MurmurHash.java | 163 ------ .../org/apache/ambari/logfeeder/OutputMgr.java | 262 --------- .../ambari/logfeeder/common/ConfigBlock.java | 263 +++++++++ .../logfeeder/common/LogfeederException.java | 31 ++ .../logfeeder/exception/LogfeederException.java | 31 -- .../apache/ambari/logfeeder/filter/Filter.java | 16 +- .../ambari/logfeeder/filter/FilterGrok.java | 6 +- .../ambari/logfeeder/filter/FilterJSON.java | 4 +- .../ambari/logfeeder/filter/FilterKeyValue.java | 6 +- .../apache/ambari/logfeeder/input/Input.java | 9 +- .../ambari/logfeeder/input/InputFile.java | 2 +- .../apache/ambari/logfeeder/input/InputMgr.java | 451 +++++++++++++++ .../ambari/logfeeder/input/InputS3File.java | 4 +- .../ambari/logfeeder/input/InputSimulate.java | 2 +- .../logconfig/FetchConfigFromSolr.java | 2 +- .../logfeeder/logconfig/LogfeederScheduler.java | 2 +- .../logconfig/filter/ApplyLogFilter.java | 2 +- .../logconfig/filter/FilterLogData.java | 2 +- .../ambari/logfeeder/mapper/MapperDate.java | 2 +- .../logfeeder/mapper/MapperFieldName.java | 2 +- .../logfeeder/mapper/MapperFieldValue.java | 2 +- .../logfeeder/metrics/LogFeederAMSClient.java | 81 +++ .../ambari/logfeeder/metrics/MetricCount.java | 31 ++ .../ambari/logfeeder/metrics/MetricsMgr.java | 178 ++++++ .../apache/ambari/logfeeder/output/Output.java | 6 +- .../ambari/logfeeder/output/OutputFile.java | 2 +- .../ambari/logfeeder/output/OutputHDFSFile.java | 2 +- .../ambari/logfeeder/output/OutputKafka.java | 2 +- .../ambari/logfeeder/output/OutputMgr.java | 263 +++++++++ .../ambari/logfeeder/output/OutputS3File.java | 4 +- .../ambari/logfeeder/output/OutputSolr.java | 2 +- .../logfeeder/output/S3LogPathResolver.java | 4 +- .../logfeeder/output/S3OutputConfiguration.java | 4 +- .../ambari/logfeeder/output/S3Uploader.java | 4 +- .../org/apache/ambari/logfeeder/s3/AWSUtil.java | 84 --- .../org/apache/ambari/logfeeder/s3/S3Util.java | 186 ------- .../apache/ambari/logfeeder/util/AWSUtil.java | 84 +++ .../apache/ambari/logfeeder/util/AliasUtil.java | 99 ++++ .../ambari/logfeeder/util/LogFeederUtil.java | 557 +++++++++++++++++++ .../ambari/logfeeder/util/MurmurHash.java | 163 ++++++ .../apache/ambari/logfeeder/util/S3Util.java | 186 +++++++ .../apache/ambari/logfeeder/util/SolrUtil.java | 1 - .../ambari/logfeeder/filter/FilterGrokTest.java | 2 +- .../ambari/logfeeder/filter/FilterJSONTest.java | 6 +- .../logfeeder/filter/FilterKeyValueTest.java | 2 +- .../ambari/logfeeder/input/InputFileTest.java | 1 - .../ambari/logfeeder/mapper/MapperDateTest.java | 2 +- .../logfeeder/output/S3LogPathResolverTest.java | 3 +- .../ambari/logfeeder/output/S3UploaderTest.java | 2 +- .../apache/ambari/logfeeder/s3/AWSUtilTest.java | 27 - .../apache/ambari/logfeeder/s3/S3UtilTest.java | 38 -- .../ambari/logfeeder/util/AWSUtilTest.java | 29 + .../ambari/logfeeder/util/S3UtilTest.java | 40 ++ 61 files changed, 2519 insertions(+), 2504 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java deleted file mode 100644 index 44bc829..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder; - -import java.io.File; -import java.util.HashMap; - -import org.apache.log4j.Logger; - -public class AliasUtil { - - private static Logger logger = Logger.getLogger(AliasUtil.class); - - private static AliasUtil instance = null; - - private static String aliasConfigJson = "alias_config.json"; - - private HashMap<String, Object> aliasMap = null; - - public static enum ALIAS_TYPE { - INPUT, FILTER, MAPPER, OUTPUT - } - - public static enum ALIAS_PARAM { - KLASS - } - - private AliasUtil() { - init(); - } - - public static AliasUtil getInstance() { - if (instance == null) { - synchronized (AliasUtil.class) { - if (instance == null) { - instance = new AliasUtil(); - } - } - } - return instance; - } - - /** - */ - private void init() { - File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson); - if (jsonFile != null) { - this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile); - } - - } - - - public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) { - String result = key;// key as a default value; - HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype); - String value = aliasInfo.get(aliasParam.name().toLowerCase()); - if (value != null && !value.isEmpty()) { - result = value; - logger.debug("Alias found for key :" + key + ", param :" + aliasParam.name().toLowerCase() + ", value :" - + value + " aliastype:" + aliastype.name()); - } else { - logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase()); - } - return result; - } - - @SuppressWarnings("unchecked") - private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) { - HashMap<String, String> aliasInfo = null; - if (aliasMap != null) { - String typeKey = aliastype.name().toLowerCase(); - HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey); - if (typeJson != null) { - aliasInfo = (HashMap<String, String>) typeJson.get(key); - } - } - if (aliasInfo == null) { - aliasInfo = new HashMap<String, String>(); - } - return aliasInfo; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java deleted file mode 100644 index c3ccc47..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; -import org.apache.log4j.Priority; - - -public abstract class ConfigBlock { - static private Logger logger = Logger.getLogger(ConfigBlock.class); - - private boolean drain = false; - - protected Map<String, Object> configs; - protected Map<String, String> contextFields = new HashMap<String, String>(); - public MetricCount statMetric = new MetricCount(); - - /** - * - */ - public ConfigBlock() { - super(); - } - - /** - * Used while logging. Keep it short and meaningful - */ - public abstract String getShortDescription(); - - /** - * Every implementor need to give name to the thread they create - */ - public String getNameForThread() { - return this.getClass().getSimpleName(); - } - - /** - * @param metricsList - */ - public void addMetricsContainers(List<MetricCount> metricsList) { - metricsList.add(statMetric); - } - - /** - * This method needs to be overwritten by deriving classes. - */ - public void init() throws Exception { - } - - public void loadConfig(Map<String, Object> map) { - configs = LogFeederUtil.cloneObject(map); - - Map<String, String> nvList = getNVList("add_fields"); - if (nvList != null) { - contextFields.putAll(nvList); - } - } - - public Map<String, Object> getConfigs() { - return configs; - } - - @SuppressWarnings("unchecked") - public boolean isEnabled() { - boolean isEnabled = getBooleanValue("is_enabled", true); - if (isEnabled) { - // Let's check for static conditions - Map<String, Object> conditions = (Map<String, Object>) configs - .get("conditions"); - boolean allow = true; - if (conditions != null && conditions.size() > 0) { - allow = false; - for (String conditionType : conditions.keySet()) { - if (conditionType.equalsIgnoreCase("fields")) { - Map<String, Object> fields = (Map<String, Object>) conditions - .get("fields"); - for (String fieldName : fields.keySet()) { - Object values = fields.get(fieldName); - if (values instanceof String) { - allow = isFieldConditionMatch(fieldName, - (String) values); - } else { - List<String> listValues = (List<String>) values; - for (String stringValue : listValues) { - allow = isFieldConditionMatch(fieldName, - stringValue); - if (allow) { - break; - } - } - } - if (allow) { - break; - } - } - } - if (allow) { - break; - } - } - isEnabled = allow; - } - } - return isEnabled; - } - - public boolean isFieldConditionMatch(String fieldName, String stringValue) { - boolean allow = false; - String fieldValue = (String) configs.get(fieldName); - if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) { - allow = true; - } else { - @SuppressWarnings("unchecked") - Map<String, Object> addFields = (Map<String, Object>) configs - .get("add_fields"); - if (addFields != null && addFields.get(fieldName) != null) { - String addFieldValue = (String) addFields.get(fieldName); - if (stringValue.equalsIgnoreCase(addFieldValue)) { - allow = true; - } - } - - } - return allow; - } - - @SuppressWarnings("unchecked") - public Map<String, String> getNVList(String key) { - return (Map<String, String>) configs.get(key); - } - - public String getStringValue(String key) { - Object value = configs.get(key); - if (value != null && value.toString().equalsIgnoreCase("none")) { - value = null; - } - if (value != null) { - return value.toString(); - } - return null; - } - - public String getStringValue(String key, String defaultValue) { - Object value = configs.get(key); - if (value != null && value.toString().equalsIgnoreCase("none")) { - value = null; - } - - if (value != null) { - return value.toString(); - } - return defaultValue; - } - - public Object getConfigValue(String key) { - return configs.get(key); - } - - public boolean getBooleanValue(String key, boolean defaultValue) { - String strValue = getStringValue(key); - boolean retValue = defaultValue; - if (!StringUtils.isEmpty(strValue)) { - if (strValue.equalsIgnoreCase("true") - || strValue.equalsIgnoreCase("yes")) { - retValue = true; - } else { - retValue = false; - } - } - return retValue; - } - - public int getIntValue(String key, int defaultValue) { - String strValue = getStringValue(key); - int retValue = defaultValue; - if (!StringUtils.isEmpty(strValue)) { - try { - retValue = Integer.parseInt(strValue); - } catch (Throwable t) { - logger.error("Error parsing integer value. key=" + key - + ", value=" + strValue); - } - } - return retValue; - } - - public long getLongValue(String key, long defaultValue) { - String strValue = getStringValue(key); - Long retValue = defaultValue; - if (!StringUtils.isEmpty(strValue)) { - try { - retValue = Long.parseLong(strValue); - } catch (Throwable t) { - logger.error("Error parsing long value. key=" + key + ", value=" - + strValue); - } - } - return retValue; - } - - public Map<String, String> getContextFields() { - return contextFields; - } - - public void incrementStat(int count) { - statMetric.count += count; - } - - public void logStatForMetric(MetricCount metric, String prefixStr) { - LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" - + getShortDescription()); - } - - synchronized public void logStat() { - logStatForMetric(statMetric, "Stat"); - } - - public boolean logConfgs(Priority level) { - if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) { - return false; - } - if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) { - return false; - } - logger.log(level, "Printing configuration Block=" - + getShortDescription()); - logger.log(level, "configs=" + configs); - logger.log(level, "contextFields=" + contextFields); - return true; - } - - public boolean isDrain() { - return drain; - } - - public void setDrain(boolean drain) { - this.drain = drain; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java deleted file mode 100644 index fa60702..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -import java.io.EOFException; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.ambari.logfeeder.input.Input; -import org.apache.ambari.logfeeder.input.InputFile; -import org.apache.commons.io.filefilter.WildcardFileFilter; -import org.apache.log4j.Logger; -import org.apache.solr.common.util.Base64; - -public class InputMgr { - private static final Logger logger = Logger.getLogger(InputMgr.class); - - private List<Input> inputList = new ArrayList<Input>(); - private Set<Input> notReadyList = new HashSet<Input>(); - - private boolean isDrain = false; - private boolean isAnyInputTail = false; - - private String checkPointSubFolderName = "logfeeder_checkpoints"; - private File checkPointFolderFile = null; - - private MetricCount filesCountMetric = new MetricCount(); - - private String checkPointExtension = ".cp"; - - private Thread inputIsReadyMonitor = null; - - public List<Input> getInputList() { - return inputList; - } - - public void add(Input input) { - inputList.add(input); - } - - public void removeInput(Input input) { - logger.info("Trying to remove from inputList. " - + input.getShortDescription()); - Iterator<Input> iter = inputList.iterator(); - while (iter.hasNext()) { - Input iterInput = iter.next(); - if (iterInput.equals(input)) { - logger.info("Removing Input from inputList. " - + input.getShortDescription()); - iter.remove(); - } - } - } - - public int getActiveFilesCount() { - int count = 0; - for (Input input : inputList) { - if (input.isReady()) { - count++; - } - } - return count; - } - - public void init() { - filesCountMetric.metricsName = "input.files.count"; - filesCountMetric.isPointInTime = true; - - checkPointExtension = LogFeederUtil.getStringProperty( - "logfeeder.checkpoint.extension", checkPointExtension); - for (Input input : inputList) { - try { - input.init(); - if (input.isTail()) { - isAnyInputTail = true; - } - } catch (Exception e) { - logger.error( - "Error initializing input. " - + input.getShortDescription(), e); - } - } - - if (isAnyInputTail) { - logger.info("Determining valid checkpoint folder"); - boolean isCheckPointFolderValid = false; - // We need to keep track of the files we are reading. - String checkPointFolder = LogFeederUtil - .getStringProperty("logfeeder.checkpoint.folder"); - if (checkPointFolder != null && !checkPointFolder.isEmpty()) { - checkPointFolderFile = new File(checkPointFolder); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - } - if (!isCheckPointFolderValid) { - // Let's try home folder - String userHome = LogFeederUtil.getStringProperty("user.home"); - if (userHome != null) { - checkPointFolderFile = new File(userHome, - checkPointSubFolderName); - logger.info("Checking if home folder can be used for checkpoints. Folder=" - + checkPointFolderFile); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - } - } - if (!isCheckPointFolderValid) { - // Let's use tmp folder - String tmpFolder = LogFeederUtil - .getStringProperty("java.io.tmpdir"); - if (tmpFolder == null) { - tmpFolder = "/tmp"; - } - checkPointFolderFile = new File(tmpFolder, - checkPointSubFolderName); - logger.info("Checking if tmps folder can be used for checkpoints. Folder=" - + checkPointFolderFile); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - if (isCheckPointFolderValid) { - logger.warn("Using tmp folder " - + checkPointFolderFile - + " to store check points. This is not recommended." - + "Please set logfeeder.checkpoint.folder property"); - } - } - - if (isCheckPointFolderValid) { - logger.info("Using folder " + checkPointFolderFile - + " for storing checkpoints"); - } - } - - } - - public File getCheckPointFolderFile() { - return checkPointFolderFile; - } - - private boolean verifyCheckPointFolder(File folderPathFile) { - if (!folderPathFile.exists()) { - // Create the folder - try { - if (!folderPathFile.mkdir()) { - logger.warn("Error creating folder for check point. folder=" - + folderPathFile); - } - } catch (Throwable t) { - logger.warn("Error creating folder for check point. folder=" - + folderPathFile, t); - } - } - - if (folderPathFile.exists() && folderPathFile.isDirectory()) { - // Let's check whether we can create a file - File testFile = new File(folderPathFile, UUID.randomUUID() - .toString()); - try { - testFile.createNewFile(); - return testFile.delete(); - } catch (IOException e) { - logger.warn( - "Couldn't create test file in " - + folderPathFile.getAbsolutePath() - + " for checkPoint", e); - } - } - return false; - } - - public void monitor() { - for (Input input : inputList) { - if (input.isReady()) { - input.monitor(); - } else { - if (input.isTail()) { - logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. " - + input.getShortDescription()); - notReadyList.add(input); - } else { - logger.info("Input is not ready, so going to ignore it " - + input.getShortDescription()); - } - } - } - // Start the monitoring thread if any file is in tail mode - if (isAnyInputTail) { - inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { - @Override - public void run() { - logger.info("Going to monitor for these missing files: " - + notReadyList.toString()); - while (true) { - if (isDrain) { - logger.info("Exiting missing file monitor."); - break; - } - try { - Iterator<Input> iter = notReadyList.iterator(); - while (iter.hasNext()) { - Input input = iter.next(); - try { - if (input.isReady()) { - input.monitor(); - iter.remove(); - } - } catch (Throwable t) { - logger.error("Error while enabling monitoring for input. " - + input.getShortDescription()); - } - } - Thread.sleep(30 * 1000); - } catch (Throwable t) { - // Ignore - } - } - } - }; - inputIsReadyMonitor.start(); - } - } - - public void addToNotReady(Input notReadyInput) { - notReadyList.add(notReadyInput); - } - - public void addMetricsContainers(List<MetricCount> metricsList) { - for (Input input : inputList) { - input.addMetricsContainers(metricsList); - } - filesCountMetric.count = getActiveFilesCount(); - metricsList.add(filesCountMetric); - } - - public void logStats() { - for (Input input : inputList) { - input.logStat(); - } - - filesCountMetric.count = getActiveFilesCount(); - LogFeederUtil.logStatForMetric(filesCountMetric, - "Stat: Files Monitored Count", null); - } - - public void close() { - for (Input input : inputList) { - try { - input.setDrain(true); - } catch (Throwable t) { - logger.error( - "Error while draining. input=" - + input.getShortDescription(), t); - } - } - isDrain = true; - - // Need to get this value from property - int iterations = 30; - int waitTimeMS = 1000; - int i = 0; - boolean allClosed = true; - for (i = 0; i < iterations; i++) { - allClosed = true; - for (Input input : inputList) { - if (!input.isClosed()) { - try { - allClosed = false; - logger.warn("Waiting for input to close. " - + input.getShortDescription() + ", " - + (iterations - i) + " more seconds"); - Thread.sleep(waitTimeMS); - } catch (Throwable t) { - // Ignore - } - } - } - if (allClosed) { - break; - } - } - if (!allClosed) { - logger.warn("Some inputs were not closed. Iterations=" + i); - for (Input input : inputList) { - if (!input.isClosed()) { - logger.warn("Input not closed. Will ignore it." - + input.getShortDescription()); - } - } - } else { - logger.info("All inputs are closed. Iterations=" + i); - } - - } - - public void checkInAll() { - for (Input input : inputList) { - input.checkIn(); - } - } - - public void cleanCheckPointFiles() { - - if (checkPointFolderFile == null) { - logger.info("Will not clean checkPoint files. checkPointFolderFile=" - + checkPointFolderFile); - return; - } - logger.info("Cleaning checkPoint files. checkPointFolderFile=" - + checkPointFolderFile.getAbsolutePath()); - try { - // Loop over the check point files and if filePath is not present, then move to closed - String searchPath = "*" + checkPointExtension; - FileFilter fileFilter = new WildcardFileFilter(searchPath); - File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter); - int totalCheckFilesDeleted = 0; - for (File checkPointFile : checkPointFiles) { - RandomAccessFile checkPointReader = null; - try { - checkPointReader = new RandomAccessFile(checkPointFile, "r"); - - int contentSize = checkPointReader.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointReader.read(b, 0, contentSize); - if (readSize != contentSize) { - logger.error("Couldn't read expected number of bytes from checkpoint file. expected=" - + contentSize - + ", read=" - + readSize - + ", checkPointFile=" + checkPointFile); - } else { - // Create JSON string - String jsonCheckPointStr = new String(b, 0, readSize); - Map<String, Object> jsonCheckPoint = LogFeederUtil - .toJSONObject(jsonCheckPointStr); - - String logFilePath = (String) jsonCheckPoint - .get("file_path"); - String logFileKey = (String) jsonCheckPoint - .get("file_key"); - if (logFilePath != null && logFileKey != null) { - boolean deleteCheckPointFile = false; - File logFile = new File(logFilePath); - if (logFile.exists()) { - Object fileKeyObj = InputFile - .getFileKey(logFile); - String fileBase64 = Base64 - .byteArrayToBase64(fileKeyObj - .toString().getBytes()); - if (!logFileKey.equals(fileBase64)) { - deleteCheckPointFile = true; - logger.info("CheckPoint clean: File key has changed. old=" - + logFileKey - + ", new=" - + fileBase64 - + ", filePath=" - + logFilePath - + ", checkPointFile=" - + checkPointFile.getAbsolutePath()); - } - } else { - logger.info("CheckPoint clean: Log file doesn't exist. filePath=" - + logFilePath - + ", checkPointFile=" - + checkPointFile.getAbsolutePath()); - deleteCheckPointFile = true; - } - if (deleteCheckPointFile) { - logger.info("Deleting CheckPoint file=" - + checkPointFile.getAbsolutePath() - + ", logFile=" + logFilePath); - checkPointFile.delete(); - totalCheckFilesDeleted++; - } - } - } - } catch (EOFException eof) { - logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. " - + checkPointFile); - } catch (Throwable t) { - logger.error("Error while checking checkPoint file. " - + checkPointFile, t); - } finally { - if (checkPointReader != null) { - try { - checkPointReader.close(); - } catch (Throwable t) { - logger.error("Error closing checkPoint file. " - + checkPointFile, t); - } - } - } - } - logger.info("Deleted " + totalCheckFilesDeleted - + " checkPoint file(s). checkPointFolderFile=" - + checkPointFolderFile.getAbsolutePath()); - - } catch (Throwable t) { - logger.error("Error while cleaning checkPointFiles", t); - } - } - - public void waitOnAllInputs() { - //wait on inputs - if (inputList != null) { - for (Input input : inputList) { - if (input != null) { - Thread inputThread = input.getThread(); - if (inputThread != null) { - try { - inputThread.join(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - } - // wait on monitor - if (inputIsReadyMonitor != null) { - try { - this.close(); - inputIsReadyMonitor.join(); - } catch (InterruptedException e) { - // ignore - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 3cf0fff..373d743 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -37,14 +37,20 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM; -import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMgr; import org.apache.ambari.logfeeder.input.InputSimulate; import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.metrics.MetricsMgr; import org.apache.ambari.logfeeder.output.Output; +import org.apache.ambari.logfeeder.output.OutputMgr; +import org.apache.ambari.logfeeder.util.AliasUtil; import org.apache.ambari.logfeeder.util.FileUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM; +import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java deleted file mode 100644 index da61d83..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.log4j.Logger; - -// TODO: Refactor for failover -public class LogFeederAMSClient extends AbstractTimelineMetricsSink { - private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class); - - private String collectorHosts = null; - - public LogFeederAMSClient() { - collectorHosts = LogFeederUtil - .getStringProperty("logfeeder.metrics.collector.hosts"); - if (collectorHosts != null && collectorHosts.trim().length() == 0) { - collectorHosts = null; - } - if (collectorHosts != null) { - collectorHosts = collectorHosts.trim(); - } - logger.info("AMS collector URL=" + collectorHosts); - } - - @Override - public String getCollectorUri(String host) { - return collectorHosts; - } - - @Override - protected int getTimeoutSeconds() { - // TODO: Hard coded timeout - return 10; - } - - @Override - protected String getZookeeperQuorum() { - return null; - } - - @Override - protected String getConfiguredCollectors() { - return null; - } - - @Override - protected String getHostname() { - return null; - } - - @Override - protected boolean emitMetrics(TimelineMetrics metrics) { - return super.emitMetrics(metrics); - } - - @Override - protected String getCollectorProtocol() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java deleted file mode 100644 index a86d989..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java +++ /dev/null @@ -1,556 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.lang.reflect.Type; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.TimeZone; - -import org.apache.ambari.logfeeder.filter.Filter; -import org.apache.ambari.logfeeder.input.Input; -import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; -import org.apache.ambari.logfeeder.mapper.Mapper; -import org.apache.ambari.logfeeder.output.Output; -import org.apache.ambari.logfeeder.util.PlaceholderUtil; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; - -import com.google.common.collect.ObjectArrays; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; - -/** - * This class contains utility methods used by LogFeeder - */ -public class LogFeederUtil { - private static final Logger logger = Logger.getLogger(LogFeederUtil.class); - - private static final int HASH_SEED = 31174077; - public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; - public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - private static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); - - private static Properties props; - - private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>(); - private static int logInterval = 30000; // 30 seconds - - public static String hostName = null; - public static String ipAddress = null; - - private static String logfeederTempDir = null; - - private static final Object _LOCK = new Object(); - - static{ - setHostNameAndIP(); - } - - public static Gson getGson() { - return gson; - } - - private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() { - @Override - protected SimpleDateFormat initialValue() { - SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT); - sdf.setTimeZone(TimeZone.getTimeZone("UTC")); - return sdf; - } - }; - - /** - * This method will read the properties from System, followed by propFile - * and finally from the map - */ - public static void loadProperties(String propFile, String[] propNVList) - throws Exception { - logger.info("Loading properties. propFile=" + propFile); - props = new Properties(System.getProperties()); - boolean propLoaded = false; - - // First get properties file path from environment value - String propertiesFilePath = System.getProperty("properties"); - if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) { - File propertiesFile = new File(propertiesFilePath); - if (propertiesFile.exists() && propertiesFile.isFile()) { - logger.info("Properties file path set in environment. Loading properties file=" - + propertiesFilePath); - FileInputStream fileInputStream = null; - try { - fileInputStream = new FileInputStream(propertiesFile); - props.load(fileInputStream); - propLoaded = true; - } catch (Throwable t) { - logger.error("Error loading properties file. properties file=" - + propertiesFile.getAbsolutePath()); - } finally { - if (fileInputStream != null) { - try { - fileInputStream.close(); - } catch (Throwable t) { - // Ignore error - } - } - } - } else { - logger.error("Properties file path set in environment, but file not found. properties file=" - + propertiesFilePath); - } - } - - if (!propLoaded) { - BufferedInputStream fileInputStream = null; - try { - // Properties not yet loaded, let's try from class loader - fileInputStream = (BufferedInputStream) LogFeeder.class - .getClassLoader().getResourceAsStream(propFile); - if (fileInputStream != null) { - logger.info("Loading properties file " + propFile - + " from classpath"); - props.load(fileInputStream); - propLoaded = true; - } else { - logger.fatal("Properties file not found in classpath. properties file name= " - + propFile); - } - } finally { - if (fileInputStream != null) { - try { - fileInputStream.close(); - } catch (IOException e) { - } - } - } - } - - if (!propLoaded) { - logger.fatal("Properties file is not loaded."); - throw new Exception("Properties not loaded"); - } else { - updatePropertiesFromMap(propNVList); - } - } - - private static void updatePropertiesFromMap(String[] nvList) { - if (nvList == null) { - return; - } - logger.info("Trying to load additional proeprties from argument paramters. nvList.length=" - + nvList.length); - if (nvList != null && nvList.length > 0) { - for (String nv : nvList) { - logger.info("Passed nv=" + nv); - if (nv.startsWith("-") && nv.length() > 1) { - nv = nv.substring(1); - logger.info("Stripped nv=" + nv); - int i = nv.indexOf("="); - if (nv.length() > i) { - logger.info("Candidate nv=" + nv); - String name = nv.substring(0, i); - String value = nv.substring(i + 1); - logger.info("Adding property from argument to properties. name=" - + name + ", value=" + value); - props.put(name, value); - } - } - } - } - } - - static public String getStringProperty(String key) { - if (props != null) { - return props.getProperty(key); - } - return null; - } - - static public String getStringProperty(String key, String defaultValue) { - if (props != null) { - return props.getProperty(key, defaultValue); - } - return defaultValue; - } - - static public boolean getBooleanProperty(String key, boolean defaultValue) { - String strValue = getStringProperty(key); - return toBoolean(strValue, defaultValue); - } - - private static boolean toBoolean(String strValue, boolean defaultValue) { - boolean retValue = defaultValue; - if (!StringUtils.isEmpty(strValue)) { - if (strValue.equalsIgnoreCase("true") - || strValue.equalsIgnoreCase("yes")) { - retValue = true; - } else { - retValue = false; - } - } - return retValue; - } - - static public int getIntProperty(String key, int defaultValue) { - String strValue = getStringProperty(key); - int retValue = defaultValue; - retValue = objectToInt(strValue, retValue, ", key=" + key); - return retValue; - } - - public static int objectToInt(Object objValue, int retValue, - String errMessage) { - if (objValue == null) { - return retValue; - } - String strValue = objValue.toString(); - if (!StringUtils.isEmpty(strValue)) { - try { - retValue = Integer.parseInt(strValue); - } catch (Throwable t) { - logger.error("Error parsing integer value. str=" + strValue - + ", " + errMessage); - } - } - return retValue; - } - - public static boolean isEnabled(Map<String, Object> conditionConfigs, - Map<String, Object> valueConfigs) { - boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true); - @SuppressWarnings("unchecked") - Map<String, Object> conditions = (Map<String, Object>) conditionConfigs - .get("conditions"); - if (conditions != null && conditions.size() > 0) { - allow = false; - for (String conditionType : conditions.keySet()) { - if (conditionType.equalsIgnoreCase("fields")) { - @SuppressWarnings("unchecked") - Map<String, Object> fields = (Map<String, Object>) conditions - .get("fields"); - for (String fieldName : fields.keySet()) { - Object values = fields.get(fieldName); - if (values instanceof String) { - allow = isFieldConditionMatch(valueConfigs, - fieldName, (String) values); - } else { - @SuppressWarnings("unchecked") - List<String> listValues = (List<String>) values; - for (String stringValue : listValues) { - allow = isFieldConditionMatch(valueConfigs, - fieldName, stringValue); - if (allow) { - break; - } - } - } - if (allow) { - break; - } - } - } - if (allow) { - break; - } - } - } - return allow; - } - - public static boolean isFieldConditionMatch(Map<String, Object> configs, - String fieldName, String stringValue) { - boolean allow = false; - String fieldValue = (String) configs.get(fieldName); - if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) { - allow = true; - } else { - @SuppressWarnings("unchecked") - Map<String, Object> addFields = (Map<String, Object>) configs - .get("add_fields"); - if (addFields != null && addFields.get(fieldName) != null) { - String addFieldValue = (String) addFields.get(fieldName); - if (stringValue.equalsIgnoreCase(addFieldValue)) { - allow = true; - } - } - - } - return allow; - } - - public static void logStatForMetric(MetricCount metric, String prefixStr, - String postFix) { - long currStat = metric.count; - long currMS = System.currentTimeMillis(); - if (currStat > metric.prevLogCount) { - if (postFix == null) { - postFix = ""; - } - logger.info(prefixStr + ": total_count=" + metric.count - + ", duration=" + (currMS - metric.prevLogMS) / 1000 - + " secs, count=" + (currStat - metric.prevLogCount) - + postFix); - } - metric.prevLogCount = currStat; - metric.prevLogMS = currMS; - } - - public static Map<String, Object> cloneObject(Map<String, Object> map) { - if (map == null) { - return null; - } - String jsonStr = gson.toJson(map); - Type type = new TypeToken<Map<String, Object>>() { - }.getType(); - return gson.fromJson(jsonStr, type); - } - - public static Map<String, Object> toJSONObject(String jsonStr) { - if(jsonStr==null || jsonStr.trim().isEmpty()){ - return new HashMap<String, Object>(); - } - Type type = new TypeToken<Map<String, Object>>() { - }.getType(); - return gson.fromJson(jsonStr, type); - } - - static public boolean logErrorMessageByInterval(String key, String message, - Throwable e, Logger callerLogger, Level level) { - - LogHistory log = logHistoryList.get(key); - if (log == null) { - log = new LogHistory(); - logHistoryList.put(key, log); - } - if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) { - log.lastLogTime = System.currentTimeMillis(); - int counter = log.counter; - log.counter = 0; - if (counter > 0) { - message += ". Messages suppressed before: " + counter; - } - if (e == null) { - callerLogger.log(level, message); - } else { - callerLogger.log(level, message, e); - } - - return true; - } else { - log.counter++; - } - return false; - - } - - static public String subString(String str, int maxLength) { - if (str == null || str.length() == 0) { - return ""; - } - maxLength = str.length() < maxLength ? str.length() : maxLength; - return str.substring(0, maxLength); - } - - public static long genHash(String value) { - if (value == null) { - value = "null"; - } - return MurmurHash.hash64A(value.getBytes(), HASH_SEED); - } - - private static class LogHistory { - private long lastLogTime = 0; - private int counter = 0; - } - - public static String getDate(String timeStampStr) { - try { - return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr))); - } catch (Exception ex) { - logger.error(ex); - return null; - } - } - - public static String getActualDateStr() { - try { - return dateFormatter.get().format(new Date()); - } catch (Exception ex) { - logger.error(ex); - return null; - } - } - - public static File getFileFromClasspath(String filename) { - URL fileCompleteUrl = Thread.currentThread().getContextClassLoader() - .getResource(filename); - logger.debug("File Complete URI :" + fileCompleteUrl); - File file = null; - try { - file = new File(fileCompleteUrl.toURI()); - } catch (Exception exception) { - logger.debug(exception.getMessage(), exception.getCause()); - } - return file; - } - - public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) { - Object instance = null; - try { - instance = (Object) Class.forName(classFullName).getConstructor().newInstance(); - } catch (Exception exception) { - logger.error("Unsupported class =" + classFullName, exception.getCause()); - } - // check instance class as par aliasType - if (instance != null) { - boolean isValid = false; - switch (aliasType) { - case FILTER: - isValid = Filter.class.isAssignableFrom(instance.getClass()); - break; - case INPUT: - isValid = Input.class.isAssignableFrom(instance.getClass()); - break; - case OUTPUT: - isValid = Output.class.isAssignableFrom(instance.getClass()); - break; - case MAPPER: - isValid = Mapper.class.isAssignableFrom(instance.getClass()); - break; - default: - // by default consider all are valid class - isValid = true; - } - if (!isValid) { - logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name()); - } - } - return instance; - } - - public static HashMap<String, Object> readJsonFromFile(File jsonFile) { - ObjectMapper mapper = new ObjectMapper(); - try { - HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() { - }); - return jsonmap; - } catch (JsonParseException e) { - logger.error(e, e.getCause()); - } catch (JsonMappingException e) { - logger.error(e, e.getCause()); - } catch (IOException e) { - logger.error(e, e.getCause()); - } - return new HashMap<String, Object>(); - } - - public static boolean isListContains(List<String> list, String str, boolean caseSensitive) { - if (list != null) { - for (String value : list) { - if (value != null) { - if (caseSensitive) { - if (value.equals(str)) { - return true; - } - } else { - if (value.equalsIgnoreCase(str)) { - return true; - } - } - if (value.equalsIgnoreCase(LogFeederConstants.ALL)) { - return true; - } - } - } - } - return false; - } - - - private static synchronized String setHostNameAndIP() { - if (hostName == null || ipAddress == null) { - try { - InetAddress ip = InetAddress.getLocalHost(); - ipAddress = ip.getHostAddress(); - String getHostName = ip.getHostName(); - String getCanonicalHostName = ip.getCanonicalHostName(); - if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) { - logger.info("Using getCanonicalHostName()=" + getCanonicalHostName); - hostName = getCanonicalHostName; - } else { - logger.info("Using getHostName()=" + getHostName); - hostName = getHostName; - } - logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName - + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName=" - + hostName); - } catch (UnknownHostException e) { - logger.error("Error getting hostname.", e); - } - } - return hostName; - } - - public static String[] mergeArray(String[] first, String[] second) { - if (first == null) { - first = new String[0]; - } - if (second == null) { - second = new String[0]; - } - String[] mergedArray = ObjectArrays.concat(first, second, String.class); - return mergedArray; - } - - public static String getLogfeederTempDir() { - if (logfeederTempDir == null) { - synchronized (_LOCK) { - if (logfeederTempDir == null) { - String tempDirValue = getStringProperty("logfeeder.tmp.dir", - "/tmp/$username/logfeeder/"); - HashMap<String, String> contextParam = new HashMap<String, String>(); - String username = System.getProperty("user.name"); - contextParam.put("username", username); - logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, - contextParam); - } - } - } - return logfeederTempDir; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java deleted file mode 100644 index 9bb1564..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -public class MetricCount { - public String metricsName = null; - public boolean isPointInTime = false; - - public long count = 0; - public long prevLogCount = 0; - public long prevLogMS = System.currentTimeMillis(); - public long prevPublishCount = 0; - public int publishCount = 0; // Count of published metrics. Used for first time sending metrics -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java deleted file mode 100644 index b2a7786..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.TreeMap; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.log4j.Logger; - -public class MetricsMgr { - private static final Logger logger = Logger.getLogger(MetricsMgr.class); - - private boolean isMetricsEnabled = false; - private String nodeHostName = null; - private String appId = "logfeeder"; - - private long lastPublishTimeMS = 0; // Let's do the first publish immediately - private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock - - private int publishIntervalMS = 60 * 1000; - private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep - // the metrics in memory forever - private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>(); - private LogFeederAMSClient amsClient = null; - - public void init() { - logger.info("Initializing MetricsMgr()"); - amsClient = new LogFeederAMSClient(); - - if (amsClient.getCollectorUri(null) != null) { - nodeHostName = LogFeederUtil.getStringProperty("node.hostname"); - if (nodeHostName == null) { - try { - nodeHostName = InetAddress.getLocalHost().getHostName(); - } catch (Throwable e) { - logger.warn( - "Error getting hostname using InetAddress.getLocalHost().getHostName()", - e); - } - if (nodeHostName == null) { - try { - nodeHostName = InetAddress.getLocalHost() - .getCanonicalHostName(); - } catch (Throwable e) { - logger.warn( - "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", - e); - } - } - } - if (nodeHostName == null) { - isMetricsEnabled = false; - logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics"); - } else { - isMetricsEnabled = true; - logger.info("LogFeeder Metrics is enabled. Metrics host=" - + amsClient.getCollectorUri(null)); - } - } else { - logger.info("LogFeeder Metrics publish is disabled"); - } - } - - public boolean isMetricsEnabled() { - return isMetricsEnabled; - } - - synchronized public void useMetrics(List<MetricCount> metricsList) { - if (!isMetricsEnabled) { - return; - } - logger.info("useMetrics() metrics.size=" + metricsList.size()); - long currMS = System.currentTimeMillis(); - Long currMSLong = new Long(currMS); - for (MetricCount metric : metricsList) { - if (metric.metricsName == null) { - logger.debug("metric.metricsName is null"); - // Metrics is not meant to be published - continue; - } - long currCount = metric.count; - if (!metric.isPointInTime && metric.publishCount > 0 - && currCount <= metric.prevPublishCount) { - // No new data added, so let's ignore it - logger.debug("Nothing changed. " + metric.metricsName - + ", currCount=" + currCount + ", prevPublishCount=" - + metric.prevPublishCount); - continue; - } - metric.publishCount++; - - TimelineMetric timelineMetric = metricsMap.get(metric.metricsName); - if (timelineMetric == null) { - logger.debug("Creating new metric obbject for " - + metric.metricsName); - // First time for this metric - timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metric.metricsName); - timelineMetric.setHostName(nodeHostName); - timelineMetric.setAppId(appId); - timelineMetric.setStartTime(currMS); - timelineMetric.setType("Long"); - timelineMetric.setMetricValues(new TreeMap<Long, Double>()); - - metricsMap.put(metric.metricsName, timelineMetric); - } - logger.debug("Adding metrics=" + metric.metricsName); - if (metric.isPointInTime) { - timelineMetric.getMetricValues().put(currMSLong, - new Double(currCount)); - } else { - Double value = timelineMetric.getMetricValues().get(currMSLong); - if (value == null) { - value = new Double(0); - } - value += (currCount - metric.prevPublishCount); - timelineMetric.getMetricValues().put(currMSLong, value); - metric.prevPublishCount = currCount; - } - } - - if (metricsMap.size() > 0 - && currMS - lastPublishTimeMS > publishIntervalMS) { - try { - // Time to publish - TimelineMetrics timelineMetrics = new TimelineMetrics(); - List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>(); - timeLineMetricList.addAll(metricsMap.values()); - timelineMetrics.setMetrics(timeLineMetricList); - amsClient.emitMetrics(timelineMetrics); - logger.info("Published " + timeLineMetricList.size() - + " metrics to AMS"); - metricsMap.clear(); - timeLineMetricList.clear(); - lastPublishTimeMS = currMS; - } catch (Throwable t) { - logger.warn("Error sending metrics to AMS.", t); - if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) { - logger.error("AMS was not sent for last " - + maxMetricsBuffer - / 1000 - + " seconds. Purging it and will start rebuilding it again"); - metricsMap.clear(); - lastFailedPublishTimeMS = currMS; - } - } - } else { - logger.info("Not publishing metrics. metrics.size()=" - + metricsMap.size() + ", lastPublished=" - + (currMS - lastPublishTimeMS) / 1000 - + " seconds ago, intervalConfigured=" + publishIntervalMS - / 1000); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java deleted file mode 100644 index 2a54f28..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.logfeeder; - -import com.google.common.primitives.Ints; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -/** - * This is a very fast, non-cryptographic hash suitable for general hash-based - * lookup. See http://murmurhash.googlepages.com/ for more details. - * <p/> - * <p>The C version of MurmurHash 2.0 found at that site was ported - * to Java by Andrzej Bialecki (ab at getopt org).</p> - */ -public final class MurmurHash { - - private MurmurHash() { - } - - /** - * Hashes an int. - * - * @param data The int to hash. - * @param seed The seed for the hash. - * @return The 32 bit hash of the bytes in question. - */ - public static int hash(int data, int seed) { - return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed); - } - - /** - * Hashes bytes in an array. - * - * @param data The bytes to hash. - * @param seed The seed for the hash. - * @return The 32 bit hash of the bytes in question. - */ - public static int hash(byte[] data, int seed) { - return hash(ByteBuffer.wrap(data), seed); - } - - /** - * Hashes bytes in part of an array. - * - * @param data The data to hash. - * @param offset Where to start munging. - * @param length How many bytes to process. - * @param seed The seed to start with. - * @return The 32-bit hash of the data in question. - */ - public static int hash(byte[] data, int offset, int length, int seed) { - return hash(ByteBuffer.wrap(data, offset, length), seed); - } - - /** - * Hashes the bytes in a buffer from the current position to the limit. - * - * @param buf The bytes to hash. - * @param seed The seed for the hash. - * @return The 32 bit murmur hash of the bytes in the buffer. - */ - public static int hash(ByteBuffer buf, int seed) { - // save byte order for later restoration - ByteOrder byteOrder = buf.order(); - buf.order(ByteOrder.LITTLE_ENDIAN); - - int m = 0x5bd1e995; - int r = 24; - - int h = seed ^ buf.remaining(); - - while (buf.remaining() >= 4) { - int k = buf.getInt(); - - k *= m; - k ^= k >>> r; - k *= m; - - h *= m; - h ^= k; - } - - if (buf.remaining() > 0) { - ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); - // for big-endian version, use this first: - // finish.position(4-buf.remaining()); - finish.put(buf).rewind(); - h ^= finish.getInt(); - h *= m; - } - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - buf.order(byteOrder); - return h; - } - - - public static long hash64A(byte[] data, int seed) { - return hash64A(ByteBuffer.wrap(data), seed); - } - - public static long hash64A(byte[] data, int offset, int length, int seed) { - return hash64A(ByteBuffer.wrap(data, offset, length), seed); - } - - public static long hash64A(ByteBuffer buf, int seed) { - ByteOrder byteOrder = buf.order(); - buf.order(ByteOrder.LITTLE_ENDIAN); - - long m = 0xc6a4a7935bd1e995L; - int r = 47; - - long h = seed ^ (buf.remaining() * m); - - while (buf.remaining() >= 8) { - long k = buf.getLong(); - - k *= m; - k ^= k >>> r; - k *= m; - - h ^= k; - h *= m; - } - - if (buf.remaining() > 0) { - ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); - // for big-endian version, do this first: - // finish.position(8-buf.remaining()); - finish.put(buf).rewind(); - h ^= finish.getLong(); - h *= m; - } - - h ^= h >>> r; - h *= m; - h ^= h >>> r; - - buf.order(byteOrder); - return h; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java deleted file mode 100644 index 41b005b..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.ambari.logfeeder.input.Input; -import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; -import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData; -import org.apache.ambari.logfeeder.output.Output; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -public class OutputMgr { - private static final Logger logger = Logger.getLogger(OutputMgr.class); - - private Collection<Output> outputList = new ArrayList<Output>(); - - private boolean addMessageMD5 = true; - - private int MAX_OUTPUT_SIZE = 32765; // 32766-1 - private static long doc_counter = 0; - private MetricCount messageTruncateMetric = new MetricCount(); - - - public Collection<Output> getOutputList() { - return outputList; - } - - public void setOutputList(Collection<Output> outputList) { - this.outputList = outputList; - } - - public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { - Input input = inputMarker.input; - - // Update the block with the context fields - for (Map.Entry<String, String> entry : input.getContextFields() - .entrySet()) { - if (jsonObj.get(entry.getKey()) == null) { - jsonObj.put(entry.getKey(), entry.getValue()); - } - } - - // TODO: Ideally most of the overrides should be configurable - - // Add the input type - if (jsonObj.get("type") == null) { - jsonObj.put("type", input.getStringValue("type")); - } - if (jsonObj.get("path") == null && input.getFilePath() != null) { - jsonObj.put("path", input.getFilePath()); - } - if (jsonObj.get("path") == null && input.getStringValue("path") != null) { - jsonObj.put("path", input.getStringValue("path")); - } - - // Add host if required - if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) { - jsonObj.put("host", LogFeederUtil.hostName); - } - // Add IP if required - if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) { - jsonObj.put("ip", LogFeederUtil.ipAddress); - } - - //Add level - if (jsonObj.get("level") == null) { - jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN); - } - if (input.isUseEventMD5() || input.isGenEventMD5()) { - String prefix = ""; - Object logtimeObj = jsonObj.get("logtime"); - if (logtimeObj != null) { - if (logtimeObj instanceof Date) { - prefix = "" + ((Date) logtimeObj).getTime(); - } else { - prefix = logtimeObj.toString(); - } - } - Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson() - .toJson(jsonObj)); - if (input.isGenEventMD5()) { - jsonObj.put("event_md5", prefix + eventMD5.toString()); - } - if (input.isUseEventMD5()) { - jsonObj.put("id", prefix + eventMD5.toString()); - } - } - - // jsonObj.put("@timestamp", new Date()); - jsonObj.put("seq_num", new Long(doc_counter++)); - if (jsonObj.get("id") == null) { - jsonObj.put("id", UUID.randomUUID().toString()); - } - if (jsonObj.get("event_count") == null) { - jsonObj.put("event_count", new Integer(1)); - } - if (inputMarker.lineNumber > 0) { - jsonObj.put("logfile_line_number", new Integer( - inputMarker.lineNumber)); - } - if (jsonObj.containsKey("log_message")) { - // TODO: Let's check size only for log_message for now - String logMessage = (String) jsonObj.get("log_message"); - if (logMessage != null - && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { - messageTruncateMetric.count++; - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_MESSAGESIZE"; - LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Message is too big. size=" - + logMessage.getBytes().length + ", input=" - + input.getShortDescription() - + ". Truncating to " + MAX_OUTPUT_SIZE - + ", first upto 100 characters=" - + LogFeederUtil.subString(logMessage, 100), - null, logger, Level.WARN); - logMessage = new String(logMessage.getBytes(), 0, - MAX_OUTPUT_SIZE); - jsonObj.put("log_message", logMessage); - // Add error tags - @SuppressWarnings("unchecked") - List<String> tagsList = (List<String>) jsonObj.get("tags"); - if (tagsList == null) { - tagsList = new ArrayList<String>(); - jsonObj.put("tags", tagsList); - } - tagsList.add("error_message_truncated"); - - } - if (addMessageMD5) { - jsonObj.put("message_md5", - "" + LogFeederUtil.genHash(logMessage)); - } - } - //check log is allowed to send output - if (FilterLogData.INSTANCE.isAllowed(jsonObj)) { - for (Output output : input.getOutputList()) { - try { - output.write(jsonObj, inputMarker); - } catch (Exception e) { - logger.error("Error writing. to " + output.getShortDescription(), e); - } - } - } - } - - public void write(String jsonBlock, InputMarker inputMarker) { - //check log is allowed to send output - if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) { - for (Output output : inputMarker.input.getOutputList()) { - try { - output.write(jsonBlock, inputMarker); - } catch (Exception e) { - logger.error("Error writing. to " + output.getShortDescription(), e); - } - } - } - } - - public void close() { - logger.info("Close called for outputs ..."); - for (Output output : outputList) { - try { - output.setDrain(true); - output.close(); - } catch (Exception e) { - // Ignore - } - } - // Need to get this value from property - int iterations = 30; - int waitTimeMS = 1000; - int i; - boolean allClosed = true; - for (i = 0; i < iterations; i++) { - allClosed = true; - for (Output output : outputList) { - if (!output.isClosed()) { - try { - allClosed = false; - logger.warn("Waiting for output to close. " - + output.getShortDescription() + ", " - + (iterations - i) + " more seconds"); - Thread.sleep(waitTimeMS); - } catch (Throwable t) { - // Ignore - } - } - } - if (allClosed) { - break; - } - } - - if (!allClosed) { - logger.warn("Some outpus were not closed. Iterations=" + i); - for (Output output : outputList) { - if (!output.isClosed()) { - logger.warn("Output not closed. Will ignore it." - + output.getShortDescription() + ", pendingCound=" - + output.getPendingCount()); - } - } - } else { - logger.info("All outputs are closed. Iterations=" + i); - } - } - - public void logStats() { - for (Output output : outputList) { - output.logStat(); - } - LogFeederUtil.logStatForMetric(messageTruncateMetric, - "Stat: Messages Truncated", null); - } - - public void addMetricsContainers(List<MetricCount> metricsList) { - metricsList.add(messageTruncateMetric); - for (Output output : outputList) { - output.addMetricsContainers(metricsList); - } - } - - - public void copyFile(File inputFile, InputMarker inputMarker) { - Input input = inputMarker.input; - for (Output output : input.getOutputList()) { - try { - output.copyFile(inputFile, inputMarker); - }catch (Exception e) { - logger.error("Error coyping file . to " + output.getShortDescription(), - e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java new file mode 100644 index 0000000..287982f --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.common; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; + + +public abstract class ConfigBlock { + static private Logger logger = Logger.getLogger(ConfigBlock.class); + + private boolean drain = false; + + protected Map<String, Object> configs; + protected Map<String, String> contextFields = new HashMap<String, String>(); + public MetricCount statMetric = new MetricCount(); + + /** + * + */ + public ConfigBlock() { + super(); + } + + /** + * Used while logging. Keep it short and meaningful + */ + public abstract String getShortDescription(); + + /** + * Every implementor need to give name to the thread they create + */ + public String getNameForThread() { + return this.getClass().getSimpleName(); + } + + /** + * @param metricsList + */ + public void addMetricsContainers(List<MetricCount> metricsList) { + metricsList.add(statMetric); + } + + /** + * This method needs to be overwritten by deriving classes. + */ + public void init() throws Exception { + } + + public void loadConfig(Map<String, Object> map) { + configs = LogFeederUtil.cloneObject(map); + + Map<String, String> nvList = getNVList("add_fields"); + if (nvList != null) { + contextFields.putAll(nvList); + } + } + + public Map<String, Object> getConfigs() { + return configs; + } + + @SuppressWarnings("unchecked") + public boolean isEnabled() { + boolean isEnabled = getBooleanValue("is_enabled", true); + if (isEnabled) { + // Let's check for static conditions + Map<String, Object> conditions = (Map<String, Object>) configs + .get("conditions"); + boolean allow = true; + if (conditions != null && conditions.size() > 0) { + allow = false; + for (String conditionType : conditions.keySet()) { + if (conditionType.equalsIgnoreCase("fields")) { + Map<String, Object> fields = (Map<String, Object>) conditions + .get("fields"); + for (String fieldName : fields.keySet()) { + Object values = fields.get(fieldName); + if (values instanceof String) { + allow = isFieldConditionMatch(fieldName, + (String) values); + } else { + List<String> listValues = (List<String>) values; + for (String stringValue : listValues) { + allow = isFieldConditionMatch(fieldName, + stringValue); + if (allow) { + break; + } + } + } + if (allow) { + break; + } + } + } + if (allow) { + break; + } + } + isEnabled = allow; + } + } + return isEnabled; + } + + public boolean isFieldConditionMatch(String fieldName, String stringValue) { + boolean allow = false; + String fieldValue = (String) configs.get(fieldName); + if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) { + allow = true; + } else { + @SuppressWarnings("unchecked") + Map<String, Object> addFields = (Map<String, Object>) configs + .get("add_fields"); + if (addFields != null && addFields.get(fieldName) != null) { + String addFieldValue = (String) addFields.get(fieldName); + if (stringValue.equalsIgnoreCase(addFieldValue)) { + allow = true; + } + } + + } + return allow; + } + + @SuppressWarnings("unchecked") + public Map<String, String> getNVList(String key) { + return (Map<String, String>) configs.get(key); + } + + public String getStringValue(String key) { + Object value = configs.get(key); + if (value != null && value.toString().equalsIgnoreCase("none")) { + value = null; + } + if (value != null) { + return value.toString(); + } + return null; + } + + public String getStringValue(String key, String defaultValue) { + Object value = configs.get(key); + if (value != null && value.toString().equalsIgnoreCase("none")) { + value = null; + } + + if (value != null) { + return value.toString(); + } + return defaultValue; + } + + public Object getConfigValue(String key) { + return configs.get(key); + } + + public boolean getBooleanValue(String key, boolean defaultValue) { + String strValue = getStringValue(key); + boolean retValue = defaultValue; + if (!StringUtils.isEmpty(strValue)) { + if (strValue.equalsIgnoreCase("true") + || strValue.equalsIgnoreCase("yes")) { + retValue = true; + } else { + retValue = false; + } + } + return retValue; + } + + public int getIntValue(String key, int defaultValue) { + String strValue = getStringValue(key); + int retValue = defaultValue; + if (!StringUtils.isEmpty(strValue)) { + try { + retValue = Integer.parseInt(strValue); + } catch (Throwable t) { + logger.error("Error parsing integer value. key=" + key + + ", value=" + strValue); + } + } + return retValue; + } + + public long getLongValue(String key, long defaultValue) { + String strValue = getStringValue(key); + Long retValue = defaultValue; + if (!StringUtils.isEmpty(strValue)) { + try { + retValue = Long.parseLong(strValue); + } catch (Throwable t) { + logger.error("Error parsing long value. key=" + key + ", value=" + + strValue); + } + } + return retValue; + } + + public Map<String, String> getContextFields() { + return contextFields; + } + + public void incrementStat(int count) { + statMetric.count += count; + } + + public void logStatForMetric(MetricCount metric, String prefixStr) { + LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + + getShortDescription()); + } + + synchronized public void logStat() { + logStatForMetric(statMetric, "Stat"); + } + + public boolean logConfgs(Priority level) { + if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) { + return false; + } + if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) { + return false; + } + logger.log(level, "Printing configuration Block=" + + getShortDescription()); + logger.log(level, "configs=" + configs); + logger.log(level, "contextFields=" + contextFields); + return true; + } + + public boolean isDrain() { + return drain; + } + + public void setDrain(boolean drain) { + this.drain = drain; + } +}
