AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/51fdb2de Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/51fdb2de Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/51fdb2de Branch: refs/heads/branch-dev-patch-upgrade Commit: 51fdb2de5aad4c61593796a4edcdc4eda762289a Parents: 276cd5b Author: Miklos Gergely <[email protected]> Authored: Wed Sep 7 22:24:51 2016 +0200 Committer: oleewere <[email protected]> Committed: Sat Nov 5 16:05:14 2016 +0100 ---------------------------------------------------------------------- .../org/apache/ambari/logfeeder/LogFeeder.java | 594 +++++++------------ .../ambari/logfeeder/common/ConfigBlock.java | 71 +-- .../logfeeder/common/LogFeederConstants.java | 39 ++ .../apache/ambari/logfeeder/filter/Filter.java | 55 +- .../ambari/logfeeder/filter/FilterGrok.java | 94 +-- .../ambari/logfeeder/filter/FilterJSON.java | 8 +- .../ambari/logfeeder/filter/FilterKeyValue.java | 44 +- .../logfeeder/input/AbstractInputFile.java | 319 ++++++++++ .../apache/ambari/logfeeder/input/Input.java | 313 +++++----- .../ambari/logfeeder/input/InputFile.java | 503 ++-------------- .../ambari/logfeeder/input/InputManager.java | 379 ++++++++++++ .../ambari/logfeeder/input/InputMarker.java | 17 +- .../apache/ambari/logfeeder/input/InputMgr.java | 451 -------------- .../ambari/logfeeder/input/InputS3File.java | 424 +------------ .../ambari/logfeeder/input/InputSimulate.java | 40 +- .../logfeeder/input/reader/GZIPReader.java | 23 +- .../input/reader/LogsearchReaderFactory.java | 8 +- .../logconfig/FetchConfigFromSolr.java | 194 ------ .../logfeeder/logconfig/FilterLogData.java | 83 +++ .../logfeeder/logconfig/LogConfigFetcher.java | 168 ++++++ .../logfeeder/logconfig/LogConfigHandler.java | 189 ++++++ .../logfeeder/logconfig/LogFeederConstants.java | 34 -- .../logfeeder/logconfig/LogFeederFilter.java | 90 +++ .../logconfig/LogFeederFilterWrapper.java | 55 ++ .../logfeeder/logconfig/LogfeederScheduler.java | 59 -- .../logconfig/filter/ApplyLogFilter.java | 62 -- .../logconfig/filter/DefaultDataFilter.java | 49 -- .../logconfig/filter/FilterLogData.java | 53 -- .../apache/ambari/logfeeder/mapper/Mapper.java | 14 +- .../ambari/logfeeder/mapper/MapperDate.java | 32 +- .../logfeeder/mapper/MapperFieldName.java | 20 +- .../logfeeder/mapper/MapperFieldValue.java | 31 +- .../logfeeder/metrics/LogFeederAMSClient.java | 10 +- .../ambari/logfeeder/metrics/MetricCount.java | 31 - .../ambari/logfeeder/metrics/MetricData.java | 46 ++ .../logfeeder/metrics/MetricsManager.java | 168 ++++++ .../ambari/logfeeder/metrics/MetricsMgr.java | 178 ------ .../apache/ambari/logfeeder/output/Output.java | 13 +- .../ambari/logfeeder/output/OutputData.java | 8 +- .../ambari/logfeeder/output/OutputDevNull.java | 7 +- .../ambari/logfeeder/output/OutputFile.java | 42 +- .../ambari/logfeeder/output/OutputHDFSFile.java | 70 +-- .../ambari/logfeeder/output/OutputKafka.java | 58 +- .../ambari/logfeeder/output/OutputManager.java | 250 ++++++++ .../ambari/logfeeder/output/OutputMgr.java | 263 -------- .../ambari/logfeeder/output/OutputS3File.java | 41 +- .../ambari/logfeeder/output/OutputSolr.java | 62 +- .../logfeeder/output/S3LogPathResolver.java | 6 +- .../logfeeder/output/S3OutputConfiguration.java | 5 +- .../ambari/logfeeder/output/S3Uploader.java | 64 +- .../logfeeder/output/spool/LogSpooler.java | 23 +- .../output/spool/LogSpoolerContext.java | 2 +- .../output/spool/LogSpoolerException.java | 2 +- .../output/spool/RolloverCondition.java | 2 +- .../logfeeder/output/spool/RolloverHandler.java | 2 +- .../apache/ambari/logfeeder/util/AWSUtil.java | 52 +- .../apache/ambari/logfeeder/util/AliasUtil.java | 103 ++-- .../ambari/logfeeder/util/CompressionUtil.java | 15 +- .../apache/ambari/logfeeder/util/DateUtil.java | 39 +- .../apache/ambari/logfeeder/util/FileUtil.java | 66 ++- .../ambari/logfeeder/util/LogFeederUtil.java | 511 +++++----------- .../logfeeder/util/LogfeederHDFSUtil.java | 58 +- .../ambari/logfeeder/util/PlaceholderUtil.java | 32 +- .../apache/ambari/logfeeder/util/S3Util.java | 81 +-- .../apache/ambari/logfeeder/util/SolrUtil.java | 186 ------ .../ambari/logfeeder/view/VLogfeederFilter.java | 90 --- .../logfeeder/view/VLogfeederFilterWrapper.java | 55 -- .../org/apache/ambari/logfeeder/AppTest.java | 116 ---- .../ambari/logfeeder/filter/FilterGrokTest.java | 55 +- .../ambari/logfeeder/filter/FilterJSONTest.java | 41 +- .../logfeeder/filter/FilterKeyValueTest.java | 34 +- .../ambari/logfeeder/input/InputFileTest.java | 24 +- .../logfeeder/input/InputManagerTest.java | 241 ++++++++ .../logconfig/LogConfigHandlerTest.java | 117 ++++ .../ambari/logfeeder/mapper/MapperDateTest.java | 17 +- .../logfeeder/mapper/MapperFieldNameTest.java | 2 +- .../logfeeder/mapper/MapperFieldValueTest.java | 2 +- .../logfeeder/metrics/MetrcisManagerTest.java | 128 ++++ .../logfeeder/output/OutputKafkaTest.java | 5 +- .../logfeeder/output/OutputManagerTest.java | 256 ++++++++ .../logfeeder/output/OutputS3FileTest.java | 17 +- .../ambari/logfeeder/output/OutputSolrTest.java | 5 +- .../logfeeder/output/S3LogPathResolverTest.java | 2 +- .../ambari/logfeeder/output/S3UploaderTest.java | 42 +- .../logfeeder/output/spool/LogSpoolerTest.java | 2 +- .../ambari/logfeeder/util/AWSUtilTest.java | 29 - .../logfeeder/util/PlaceholderUtilTest.java | 20 +- .../ambari/logfeeder/util/S3UtilTest.java | 4 +- .../src/test/resources/logfeeder.properties | 20 + 89 files changed, 3854 insertions(+), 4481 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 373d743..6d0f22c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -20,14 +20,10 @@ package org.apache.ambari.logfeeder; import java.io.BufferedInputStream; -import java.io.BufferedReader; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -39,18 +35,21 @@ import java.util.Set; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.Input; -import org.apache.ambari.logfeeder.input.InputMgr; +import org.apache.ambari.logfeeder.input.InputManager; import org.apache.ambari.logfeeder.input.InputSimulate; -import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler; -import org.apache.ambari.logfeeder.metrics.MetricCount; -import org.apache.ambari.logfeeder.metrics.MetricsMgr; +import org.apache.ambari.logfeeder.logconfig.LogConfigHandler; +import org.apache.ambari.logfeeder.metrics.MetricData; +import org.apache.ambari.logfeeder.metrics.MetricsManager; import org.apache.ambari.logfeeder.output.Output; -import org.apache.ambari.logfeeder.output.OutputMgr; +import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.AliasUtil; import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM; -import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -58,171 +57,142 @@ import org.apache.log4j.Logger; import com.google.gson.reflect.TypeToken; public class LogFeeder { - private static final Logger logger = Logger.getLogger(LogFeeder.class); + private static final Logger LOG = Logger.getLogger(LogFeeder.class); private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30; + private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours - private Collection<Output> outputList = new ArrayList<Output>(); + private OutputManager outputManager = new OutputManager(); + private InputManager inputManager = new InputManager(); + private MetricsManager metricsManager = new MetricsManager(); - private OutputMgr outMgr = new OutputMgr(); - private InputMgr inputMgr = new InputMgr(); - private MetricsMgr metricsMgr = new MetricsMgr(); + public static Map<String, Object> globalConfigs = new HashMap<>(); - public static Map<String, Object> globalMap = null; - private String[] inputParams; - - private List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>(); - private List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>(); - private List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>(); - private List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>(); + private List<Map<String, Object>> inputConfigList = new ArrayList<>(); + private List<Map<String, Object>> filterConfigList = new ArrayList<>(); + private List<Map<String, Object>> outputConfigList = new ArrayList<>(); - private int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours private long lastCheckPointCleanedMS = 0; - - private static boolean isLogfeederCompleted = false; - + private boolean isLogfeederCompleted = false; private Thread statLoggerThread = null; - private LogFeeder(String[] args) { - inputParams = args; + private LogFeeder() {} + + public void run() { + try { + init(); + monitor(); + waitOnAllDaemonThreads(); + } catch (Throwable t) { + LOG.fatal("Caught exception in main.", t); + System.exit(1); + } } private void init() throws Throwable { + Date startTime = new Date(); - LogFeederUtil.loadProperties("logfeeder.properties", inputParams); - - String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files"); - logger.info("logfeeder.config.files=" + configFiles); + loadConfigFiles(); + addSimulatedInputs(); + mergeAllConfigs(); - String[] configFileList = null; - if (configFiles != null) { - configFileList = configFiles.split(","); - } - //list of config those are there in cmd line config dir , end with .json - String[] cmdLineConfigs = getConfigFromCmdLine(); - //merge both config - String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList, - cmdLineConfigs); - //mergedConfigList is null then set default conifg - if (mergedConfigList == null || mergedConfigList.length == 0) { - mergedConfigList = LogFeederUtil.getStringProperty("config.file", - "config.json").split(","); - } - for (String configFileName : mergedConfigList) { - logger.info("Going to load config file:" + configFileName); - //escape space from config file path - configFileName= configFileName.replace("\\ ", "%20"); + LogConfigHandler.handleConfig(); + + outputManager.init(); + inputManager.init(); + metricsManager.init(); + + LOG.debug("=============="); + + Date endTime = new Date(); + LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize"); + } + + private void loadConfigFiles() throws Exception { + List<String> configFiles = getConfigFiles(); + for (String configFileName : configFiles) { + LOG.info("Going to load config file:" + configFileName); + configFileName = configFileName.replace("\\ ", "%20"); File configFile = new File(configFileName); if (configFile.exists() && configFile.isFile()) { - logger.info("Config file exists in path." - + configFile.getAbsolutePath()); + LOG.info("Config file exists in path." + configFile.getAbsolutePath()); loadConfigsUsingFile(configFile); } else { - // Let's try to load it from class loader - logger.info("Trying to load config file from classloader: " - + configFileName); + LOG.info("Trying to load config file from classloader: " + configFileName); loadConfigsUsingClassLoader(configFileName); - logger.info("Loaded config file from classloader: " - + configFileName); + LOG.info("Loaded config file from classloader: " + configFileName); } } - - addSimulatedInputs(); - - mergeAllConfigs(); - - LogfeederScheduler.INSTANCE.start(); - - outMgr.setOutputList(outputList); - for (Output output : outputList) { - output.init(); - } - inputMgr.init(); - metricsMgr.init(); - logger.debug("=============="); } - private void loadConfigsUsingClassLoader(String configFileName) throws Exception { - BufferedInputStream fileInputStream = null; - BufferedReader br = null; - try { - fileInputStream = (BufferedInputStream) this - .getClass().getClassLoader() - .getResourceAsStream(configFileName); - if (fileInputStream != null) { - br = new BufferedReader(new InputStreamReader( - fileInputStream)); - String configData = readFile(br); - loadConfigs(configData); - } else { - throw new Exception("Can't find configFile=" + configFileName); - } - } finally { - if (br != null) { - try { - br.close(); - } catch (IOException e) { - } - } + private List<String> getConfigFiles() { + List<String> configFiles = new ArrayList<>(); + + String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files"); + LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty); + if (logfeederConfigFilesProperty != null) { + configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(","))); + } - if (fileInputStream != null) { - try { - fileInputStream.close(); - } catch (IOException e) { - } + String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir"); + if (StringUtils.isNotEmpty(inputConfigDir)) { + File configDirFile = new File(inputConfigDir); + List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false); + for (File inputConfigFile : inputConfigFiles) { + configFiles.add(inputConfigFile.getAbsolutePath()); } } + + if (CollectionUtils.isEmpty(configFiles)) { + String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json"); + configFiles.addAll(Arrays.asList(configFileProperty.split(","))); + } + + return configFiles; } - /** - * This method loads the configurations from the given file. - */ private void loadConfigsUsingFile(File configFile) throws Exception { - FileInputStream fileInputStream = null; try { - fileInputStream = new FileInputStream(configFile); - BufferedReader br = new BufferedReader(new InputStreamReader( - fileInputStream)); - String configData = readFile(br); + String configData = FileUtils.readFileToString(configFile); loadConfigs(configData); } catch (Exception t) { - logger.error("Error opening config file. configFilePath=" - + configFile.getAbsolutePath()); + LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath()); throw t; - } finally { - if (fileInputStream != null) { - try { - fileInputStream.close(); - } catch (Throwable t) { - // ignore - } - } + } + } + + private void loadConfigsUsingClassLoader(String configFileName) throws Exception { + try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) { + String configData = IOUtils.toString(fis); + loadConfigs(configData); } } @SuppressWarnings("unchecked") private void loadConfigs(String configData) throws Exception { - Type type = new TypeToken<Map<String, Object>>() { - }.getType(); - Map<String, Object> configMap = LogFeederUtil.getGson().fromJson( - configData, type); + Type type = new TypeToken<Map<String, Object>>() {}.getType(); + Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type); // Get the globals for (String key : configMap.keySet()) { - if (key.equalsIgnoreCase("global")) { - globalConfigList.add((Map<String, Object>) configMap.get(key)); - } else if (key.equalsIgnoreCase("input")) { - List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap - .get(key); - inputConfigList.addAll(mapList); - } else if (key.equalsIgnoreCase("filter")) { - List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap - .get(key); - filterConfigList.addAll(mapList); - } else if (key.equalsIgnoreCase("output")) { - List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap - .get(key); - outputConfigList.addAll(mapList); + switch (key) { + case "global" : + globalConfigs.putAll((Map<String, Object>) configMap.get(key)); + break; + case "input" : + List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key); + inputConfigList.addAll(inputConfig); + break; + case "filter" : + List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key); + filterConfigList.addAll(filterConfig); + break; + case "output" : + List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key); + outputConfigList.addAll(outputConfig); + break; + default : + LOG.warn("Unknown config key: " + key); } } } @@ -244,231 +214,175 @@ public class LogFeeder { } private void mergeAllConfigs() { - globalMap = mergeConfigs(globalConfigList); + loadOutputs(); + loadInputs(); + loadFilters(); + + assignOutputsToInputs(); + } - sortBlocks(filterConfigList); - // First loop for output + private void loadOutputs() { for (Map<String, Object> map : outputConfigList) { if (map == null) { continue; } - mergeBlocks(globalMap, map); + mergeBlocks(globalConfigs, map); String value = (String) map.get("destination"); - Output output; - if (value == null || value.isEmpty()) { - logger.error("Output block doesn't have destination element"); - continue; - } - String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS); - if (classFullName == null || classFullName.isEmpty()) { - logger.error("Destination block doesn't have output element"); + if (StringUtils.isEmpty(value)) { + LOG.error("Output block doesn't have destination element"); continue; } - output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT); - + Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT); if (output == null) { - logger.error("Destination Object is null"); + LOG.error("Output object could not be found"); continue; } - output.setDestination(value); output.loadConfig(map); - // We will only check for is_enabled out here. Down below we will - // check whether this output is enabled for the input - boolean isEnabled = output.getBooleanValue("is_enabled", true); - if (isEnabled) { - outputList.add(output); + // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input + if (output.getBooleanValue("is_enabled", true)) { output.logConfgs(Level.INFO); + outputManager.add(output); } else { - logger.info("Output is disabled. So ignoring it. " - + output.getShortDescription()); + LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription()); } } + } - // Second loop for input + private void loadInputs() { for (Map<String, Object> map : inputConfigList) { if (map == null) { continue; } - mergeBlocks(globalMap, map); + mergeBlocks(globalConfigs, map); String value = (String) map.get("source"); - Input input; - if (value == null || value.isEmpty()) { - logger.error("Input block doesn't have source element"); - continue; - } - String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS); - if (classFullName == null || classFullName.isEmpty()) { - logger.error("Source block doesn't have source element"); + if (StringUtils.isEmpty(value)) { + LOG.error("Input block doesn't have source element"); continue; } - input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT); - + Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT); if (input == null) { - logger.error("Source Object is null"); + LOG.error("Input object could not be found"); continue; } - input.setType(value); input.loadConfig(map); if (input.isEnabled()) { - input.setOutputMgr(outMgr); - input.setInputMgr(inputMgr); - inputMgr.add(input); + input.setOutputManager(outputManager); + input.setInputManager(inputManager); + inputManager.add(input); input.logConfgs(Level.INFO); } else { - logger.info("Input is disabled. So ignoring it. " - + input.getShortDescription()); + LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription()); } } + } + + private void loadFilters() { + sortFilters(); - // Third loop is for filter, but we will have to create a filter - // instance for each input, so it can maintain the state per input List<Input> toRemoveInputList = new ArrayList<Input>(); - for (Input input : inputMgr.getInputList()) { - Filter prevFilter = null; + for (Input input : inputManager.getInputList()) { for (Map<String, Object> map : filterConfigList) { if (map == null) { continue; } - mergeBlocks(globalMap, map); + mergeBlocks(globalConfigs, map); String value = (String) map.get("filter"); - Filter filter; - if (value == null || value.isEmpty()) { - logger.error("Filter block doesn't have filter element"); - continue; - } - - String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS); - if (classFullName == null || classFullName.isEmpty()) { - logger.error("Filter block doesn't have filter element"); + if (StringUtils.isEmpty(value)) { + LOG.error("Filter block doesn't have filter element"); continue; } - filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER); - + Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER); if (filter == null) { - logger.error("Filter Object is null"); + LOG.error("Filter object could not be found"); continue; } filter.loadConfig(map); filter.setInput(input); if (filter.isEnabled()) { - filter.setOutputMgr(outMgr); - if (prevFilter == null) { - input.setFirstFilter(filter); - } else { - prevFilter.setNextFilter(filter); - } - prevFilter = filter; + filter.setOutputManager(outputManager); + input.addFilter(filter); filter.logConfgs(Level.INFO); } else { - logger.debug("Ignoring filter " - + filter.getShortDescription() + " for input " - + input.getShortDescription()); + LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription()); } } + if (input.getFirstFilter() == null) { toRemoveInputList.add(input); } } - // Fourth loop is for associating valid outputs to input - Set<Output> usedOutputSet = new HashSet<Output>(); - for (Input input : inputMgr.getInputList()) { - for (Output output : outputList) { - boolean ret = LogFeederUtil.isEnabled(output.getConfigs(), - input.getConfigs()); - if (ret) { - usedOutputSet.add(output); - input.addOutput(output); - } - } - } - outputList = usedOutputSet; - for (Input toRemoveInput : toRemoveInputList) { - logger.warn("There are no filters, we will ignore this input. " - + toRemoveInput.getShortDescription()); - inputMgr.removeInput(toRemoveInput); + LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription()); + inputManager.removeInput(toRemoveInput); } } - private void sortBlocks(List<Map<String, Object>> blockList) { - - Collections.sort(blockList, new Comparator<Map<String, Object>>() { + private void sortFilters() { + Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() { @Override public int compare(Map<String, Object> o1, Map<String, Object> o2) { Object o1Sort = o1.get("sort_order"); Object o2Sort = o2.get("sort_order"); - if (o1Sort == null) { - return 0; - } - if (o2Sort == null) { + if (o1Sort == null || o2Sort == null) { return 0; } - int o1Value = 0; - if (!(o1Sort instanceof Number)) { - try { - o1Value = (new Double(Double.parseDouble(o1Sort - .toString()))).intValue(); - } catch (Throwable t) { - logger.error("Value is not of type Number. class=" - + o1Sort.getClass().getName() + ", value=" - + o1Sort.toString() + ", map=" + o1.toString()); - } - } else { - o1Value = ((Number) o1Sort).intValue(); - } - int o2Value = 0; - if (!(o2Sort instanceof Integer)) { + + int o1Value = parseSort(o1, o1Sort); + int o2Value = parseSort(o2, o2Sort); + + return o1Value - o2Value; + } + + private int parseSort(Map<String, Object> map, Object o) { + if (!(o instanceof Number)) { try { - o2Value = (new Double(Double.parseDouble(o2Sort - .toString()))).intValue(); + return (new Double(Double.parseDouble(o.toString()))).intValue(); } catch (Throwable t) { - logger.error("Value is not of type Number. class=" - + o2Sort.getClass().getName() + ", value=" - + o2Sort.toString() + ", map=" + o2.toString()); + LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString() + + ", map=" + map.toString()); + return 0; } } else { - + return ((Number) o).intValue(); } - return o1Value - o2Value; } }); } - private Map<String, Object> mergeConfigs( - List<Map<String, Object>> configList) { - Map<String, Object> mergedConfig = new HashMap<String, Object>(); - for (Map<String, Object> config : configList) { - mergeBlocks(config, mergedConfig); + private void assignOutputsToInputs() { + Set<Output> usedOutputSet = new HashSet<Output>(); + for (Input input : inputManager.getInputList()) { + for (Output output : outputManager.getOutputs()) { + if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) { + usedOutputSet.add(output); + input.addOutput(output); + } + } } - return mergedConfig; + outputManager.retainUsedOutputs(usedOutputSet); } - private void mergeBlocks(Map<String, Object> fromMap, - Map<String, Object> toMap) { - // Merge the non-string + @SuppressWarnings("unchecked") + private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) { for (String key : fromMap.keySet()) { Object objValue = fromMap.get(key); if (objValue == null) { continue; } if (objValue instanceof Map) { - @SuppressWarnings("unchecked") - Map<String, Object> globalFields = LogFeederUtil - .cloneObject((Map<String, Object>) fromMap.get(key)); + Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue); - @SuppressWarnings("unchecked") - Map<String, Object> localFields = (Map<String, Object>) toMap - .get(key); + Map<String, Object> localFields = (Map<String, Object>) toMap.get(key); if (localFields == null) { localFields = new HashMap<String, Object>(); toMap.put(key, localFields); @@ -477,8 +391,7 @@ public class LogFeeder { if (globalFields != null) { for (String fieldKey : globalFields.keySet()) { if (!localFields.containsKey(fieldKey)) { - localFields.put(fieldKey, - globalFields.get(fieldKey)); + localFields.put(fieldKey, globalFields.get(fieldKey)); } } } @@ -493,11 +406,29 @@ public class LogFeeder { } } + private class JVMShutdownHook extends Thread { + + public void run() { + try { + LOG.info("Processing is shutting down."); + + inputManager.close(); + outputManager.close(); + inputManager.checkInAll(); + + logStats(); + + LOG.info("LogSearch is exiting."); + } catch (Throwable t) { + // Ignore + } + } + } + private void monitor() throws Exception { - inputMgr.monitor(); + inputManager.monitor(); JVMShutdownHook logfeederJVMHook = new JVMShutdownHook(); - ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, - LOGFEEDER_SHUTDOWN_HOOK_PRIORITY); + ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY); statLoggerThread = new Thread("statLogger") { @@ -512,17 +443,14 @@ public class LogFeeder { try { logStats(); } catch (Throwable t) { - logger.error( - "LogStats: Caught exception while logging stats.", - t); + LOG.error("LogStats: Caught exception while logging stats.", t); } - if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) { + if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) { lastCheckPointCleanedMS = System.currentTimeMillis(); - inputMgr.cleanCheckPointFiles(); + inputManager.cleanCheckPointFiles(); } - // logfeeder is stopped then break the loop if (isLogfeederCompleted) { break; } @@ -536,84 +464,20 @@ public class LogFeeder { } private void logStats() { - inputMgr.logStats(); - outMgr.logStats(); - - if (metricsMgr.isMetricsEnabled()) { - List<MetricCount> metricsList = new ArrayList<MetricCount>(); - inputMgr.addMetricsContainers(metricsList); - outMgr.addMetricsContainers(metricsList); - metricsMgr.useMetrics(metricsList); - } - } - - private String readFile(BufferedReader br) throws Exception { - try { - StringBuilder sb = new StringBuilder(); - String line = br.readLine(); - while (line != null) { - sb.append(line); - line = br.readLine(); - } - return sb.toString(); - } catch (Exception t) { - logger.error("Error loading properties file.", t); - throw t; - } - } - - public Collection<Output> getOutputList() { - return outputList; - } - - public OutputMgr getOutMgr() { - return outMgr; - } - - public static void main(String[] args) { - LogFeeder logFeeder = new LogFeeder(args); - logFeeder.run(); - } - - public void run() { - try { - Date startTime = new Date(); - this.init(); - Date endTime = new Date(); - logger.info("Took " + (endTime.getTime() - startTime.getTime()) - + " ms to initialize"); - this.monitor(); - //wait for all background thread before stop main thread - this.waitOnAllDaemonThreads(); - } catch (Throwable t) { - logger.fatal("Caught exception in main.", t); - System.exit(1); + inputManager.logStats(); + outputManager.logStats(); + + if (metricsManager.isMetricsEnabled()) { + List<MetricData> metricsList = new ArrayList<MetricData>(); + inputManager.addMetricsContainers(metricsList); + outputManager.addMetricsContainers(metricsList); + metricsManager.useMetrics(metricsList); } } - private class JVMShutdownHook extends Thread { - - public void run() { - try { - logger.info("Processing is shutting down."); - - inputMgr.close(); - outMgr.close(); - inputMgr.checkInAll(); - - logStats(); - - logger.info("LogSearch is exiting."); - } catch (Throwable t) { - // Ignore - } - } - } - private void waitOnAllDaemonThreads() { - String foreground = LogFeederUtil.getStringProperty("foreground"); - if (foreground != null && foreground.equalsIgnoreCase("true")) { - inputMgr.waitOnAllInputs(); + if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) { + inputManager.waitOnAllInputs(); isLogfeederCompleted = true; if (statLoggerThread != null) { try { @@ -624,24 +488,16 @@ public class LogFeeder { } } } - - private String[] getConfigFromCmdLine() { - String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir"); - if (inputConfigDir != null && !inputConfigDir.isEmpty()) { - String[] searchFileWithExtensions = new String[] { "json" }; - File configDirFile = new File(inputConfigDir); - List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile, - searchFileWithExtensions, false); - if (configFiles != null && configFiles.size() > 0) { - String configPaths[] = new String[configFiles.size()]; - for (int index = 0; index < configFiles.size(); index++) { - File configFile = configFiles.get(index); - String configFilePath = configFile.getAbsolutePath(); - configPaths[index] = configFilePath; - } - return configPaths; - } + + public static void main(String[] args) { + try { + LogFeederUtil.loadProperties("logfeeder.properties", args); + } catch (Throwable t) { + LOG.warn("Could not load logfeeder properites"); + System.exit(1); } - return new String[0]; + + LogFeeder logFeeder = new LogFeeder(); + logFeeder.run(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java index 287982f..47ddc51 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java @@ -23,27 +23,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.log4j.Priority; public abstract class ConfigBlock { - static private Logger logger = Logger.getLogger(ConfigBlock.class); + private static final Logger LOG = Logger.getLogger(ConfigBlock.class); private boolean drain = false; protected Map<String, Object> configs; protected Map<String, String> contextFields = new HashMap<String, String>(); - public MetricCount statMetric = new MetricCount(); - - /** - * - */ + public MetricData statMetric = new MetricData(getStatMetricName(), false); + protected String getStatMetricName() { + return null; + } + public ConfigBlock() { - super(); } /** @@ -58,10 +58,7 @@ public abstract class ConfigBlock { return this.getClass().getSimpleName(); } - /** - * @param metricsList - */ - public void addMetricsContainers(List<MetricCount> metricsList) { + public void addMetricsContainers(List<MetricData> metricsList) { metricsList.add(statMetric); } @@ -89,25 +86,21 @@ public abstract class ConfigBlock { boolean isEnabled = getBooleanValue("is_enabled", true); if (isEnabled) { // Let's check for static conditions - Map<String, Object> conditions = (Map<String, Object>) configs - .get("conditions"); + Map<String, Object> conditions = (Map<String, Object>) configs.get("conditions"); boolean allow = true; - if (conditions != null && conditions.size() > 0) { + if (MapUtils.isNotEmpty(conditions)) { allow = false; for (String conditionType : conditions.keySet()) { if (conditionType.equalsIgnoreCase("fields")) { - Map<String, Object> fields = (Map<String, Object>) conditions - .get("fields"); + Map<String, Object> fields = (Map<String, Object>) conditions.get("fields"); for (String fieldName : fields.keySet()) { Object values = fields.get(fieldName); if (values instanceof String) { - allow = isFieldConditionMatch(fieldName, - (String) values); + allow = isFieldConditionMatch(fieldName, (String) values); } else { List<String> listValues = (List<String>) values; for (String stringValue : listValues) { - allow = isFieldConditionMatch(fieldName, - stringValue); + allow = isFieldConditionMatch(fieldName, stringValue); if (allow) { break; } @@ -135,8 +128,7 @@ public abstract class ConfigBlock { allow = true; } else { @SuppressWarnings("unchecked") - Map<String, Object> addFields = (Map<String, Object>) configs - .get("add_fields"); + Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields"); if (addFields != null && addFields.get(fieldName) != null) { String addFieldValue = (String) addFields.get(fieldName); if (stringValue.equalsIgnoreCase(addFieldValue)) { @@ -184,12 +176,7 @@ public abstract class ConfigBlock { String strValue = getStringValue(key); boolean retValue = defaultValue; if (!StringUtils.isEmpty(strValue)) { - if (strValue.equalsIgnoreCase("true") - || strValue.equalsIgnoreCase("yes")) { - retValue = true; - } else { - retValue = false; - } + retValue = (strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes")); } return retValue; } @@ -201,8 +188,7 @@ public abstract class ConfigBlock { try { retValue = Integer.parseInt(strValue); } catch (Throwable t) { - logger.error("Error parsing integer value. key=" + key - + ", value=" + strValue); + LOG.error("Error parsing integer value. key=" + key + ", value=" + strValue); } } return retValue; @@ -215,8 +201,7 @@ public abstract class ConfigBlock { try { retValue = Long.parseLong(strValue); } catch (Throwable t) { - logger.error("Error parsing long value. key=" + key + ", value=" - + strValue); + LOG.error("Error parsing long value. key=" + key + ", value=" + strValue); } } return retValue; @@ -227,29 +212,27 @@ public abstract class ConfigBlock { } public void incrementStat(int count) { - statMetric.count += count; + statMetric.value += count; } - public void logStatForMetric(MetricCount metric, String prefixStr) { - LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" - + getShortDescription()); + public void logStatForMetric(MetricData metric, String prefixStr) { + LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription()); } - synchronized public void logStat() { + public synchronized void logStat() { logStatForMetric(statMetric, "Stat"); } public boolean logConfgs(Priority level) { - if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) { + if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) { return false; } - if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) { + if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) { return false; } - logger.log(level, "Printing configuration Block=" - + getShortDescription()); - logger.log(level, "configs=" + configs); - logger.log(level, "contextFields=" + contextFields); + LOG.log(level, "Printing configuration Block=" + getShortDescription()); + LOG.log(level, "configs=" + configs); + LOG.log(level, "contextFields=" + contextFields); return true; } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java new file mode 100644 index 0000000..d1e7fba --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.common; + +public class LogFeederConstants { + + public static final String ALL = "all"; + public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config"; + public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN"; + + // solr fields + public static final String SOLR_LEVEL = "level"; + public static final String SOLR_COMPONENT = "type"; + public static final String SOLR_HOST = "host"; + + // UserConfig Constants History + public static final String VALUES = "jsons"; + public static final String ROW_TYPE = "rowtype"; + + // S3 Constants + public static final String S3_PATH_START_WITH = "s3://"; + public static final String S3_PATH_SEPARATOR = "/"; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index ab371f1..684f3c4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -29,21 +29,19 @@ import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.mapper.Mapper; -import org.apache.ambari.logfeeder.metrics.MetricCount; -import org.apache.ambari.logfeeder.output.OutputMgr; +import org.apache.ambari.logfeeder.metrics.MetricData; +import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.AliasUtil; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM; -import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE; +import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.log4j.Logger; import org.apache.log4j.Priority; public abstract class Filter extends ConfigBlock { - private static final Logger logger = Logger.getLogger(Filter.class); + private static final Logger LOG = Logger.getLogger(Filter.class); protected Input input; private Filter nextFilter = null; - private OutputMgr outputMgr; + private OutputManager outputManager; private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>(); @@ -74,15 +72,12 @@ public abstract class Filter extends ConfigBlock { } for (Map<String, Object> mapObject : mapList) { for (String mapClassCode : mapObject.keySet()) { - Mapper mapper = getMapper(mapClassCode); + Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER); if (mapper == null) { break; } - if (mapper.init(getInput().getShortDescription(), - fieldName, mapClassCode, - mapObject.get(mapClassCode))) { - List<Mapper> fieldMapList = postFieldValueMappers - .get(fieldName); + if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) { + List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName); if (fieldMapList == null) { fieldMapList = new ArrayList<Mapper>(); postFieldValueMappers.put(fieldName, fieldMapList); @@ -94,17 +89,8 @@ public abstract class Filter extends ConfigBlock { } } - private Mapper getMapper(String mapClassCode) { - String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS); - if (classFullName != null && !classFullName.isEmpty()) { - Mapper mapper = (Mapper) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.MAPPER); - return mapper; - } - return null; - } - - public void setOutputMgr(OutputMgr outputMgr) { - this.outputMgr = outputMgr; + public void setOutputManager(OutputManager outputManager) { + this.outputManager = outputManager; } public Filter getNextFilter() { @@ -131,25 +117,23 @@ public abstract class Filter extends ConfigBlock { if (nextFilter != null) { nextFilter.apply(inputStr, inputMarker); } else { - outputMgr.write(inputStr, inputMarker); + outputManager.write(inputStr, inputMarker); } } public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException { - if (postFieldValueMappers.size() > 0) { - for (String fieldName : postFieldValueMappers.keySet()) { - Object value = jsonObj.get(fieldName); - if (value != null) { - for (Mapper mapper : postFieldValueMappers.get(fieldName)) { - value = mapper.apply(jsonObj, value); - } + for (String fieldName : postFieldValueMappers.keySet()) { + Object value = jsonObj.get(fieldName); + if (value != null) { + for (Mapper mapper : postFieldValueMappers.get(fieldName)) { + value = mapper.apply(jsonObj, value); } } } if (nextFilter != null) { nextFilter.apply(jsonObj, inputMarker); } else { - outputMgr.write(jsonObj, inputMarker); + outputManager.write(jsonObj, inputMarker); } } @@ -193,16 +177,15 @@ public abstract class Filter extends ConfigBlock { if (!super.logConfgs(level)) { return false; } - logger.log(level, "input=" + input.getShortDescription()); + LOG.log(level, "input=" + input.getShortDescription()); return true; } @Override - public void addMetricsContainers(List<MetricCount> metricsList) { + public void addMetricsContainers(List<MetricData> metricsList) { super.addMetricsContainers(metricsList); if (nextFilter != null) { nextFilter.addMetricsContainers(metricsList); } } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 372c208..7e2da70 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -36,7 +36,7 @@ import oi.thekraken.grok.api.exception.GrokException; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; @@ -45,7 +45,7 @@ import org.apache.log4j.Logger; import com.google.gson.reflect.TypeToken; public class FilterGrok extends Filter { - static private Logger logger = Logger.getLogger(FilterGrok.class); + private static final Logger LOG = Logger.getLogger(FilterGrok.class); private static final String GROK_PATTERN_FILE = "grok-patterns"; @@ -68,25 +68,23 @@ public class FilterGrok extends Filter { private Type jsonType = new TypeToken<Map<String, String>>() {}.getType(); - private MetricCount grokErrorMetric = new MetricCount(); + private MetricData grokErrorMetric = new MetricData("filter.error.grok", false); @Override public void init() throws Exception { super.init(); try { - grokErrorMetric.metricsName = "filter.error.grok"; messagePattern = escapePattern(getStringValue("message_pattern")); multilinePattern = escapePattern(getStringValue("multiline_pattern")); sourceField = getStringValue("source_field"); removeSourceField = getBooleanValue("remove_source_field", removeSourceField); - logger.info("init() done. grokPattern=" + messagePattern - + ", multilinePattern=" + multilinePattern + ", " - + getShortDescription()); + LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " + + getShortDescription()); if (StringUtils.isEmpty(messagePattern)) { - logger.error("message_pattern is not set for filter."); + LOG.error("message_pattern is not set for filter."); return; } extractNamedParams(messagePattern, namedParamList); @@ -102,9 +100,7 @@ public class FilterGrok extends Filter { grokMultiline.compile(multilinePattern); } } catch (Throwable t) { - logger.fatal( - "Caught exception while initializing Grok. multilinePattern=" - + multilinePattern + ", messagePattern=" + LOG.fatal("Caught exception while initializing Grok. multilinePattern=" + multilinePattern + ", messagePattern=" + messagePattern, t); grokMessage = null; grokMultiline = null; @@ -123,9 +119,10 @@ public class FilterGrok extends Filter { } private void extractNamedParams(String patternStr, Set<String> paramList) { - String grokRegEx = "%\\{" + "(?<name>" + "(?<pattern>[A-z0-9]+)" - + "(?::(?<subname>[A-z0-9_:]+))?" + ")" + "(?:=(?<definition>" - + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + "\\}"; + String grokRegEx = "%\\{" + + "(?<name>" + "(?<pattern>[A-z0-9]+)" + "(?::(?<subname>[A-z0-9_:]+))?" + ")" + + "(?:=(?<definition>" + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + + "\\}"; Pattern pattern = Pattern.compile(grokRegEx); java.util.regex.Matcher matcher = pattern.matcher(patternStr); @@ -139,28 +136,23 @@ public class FilterGrok extends Filter { private boolean loadPatterns(Grok grok) { InputStreamReader grokPatternsReader = null; - logger.info("Loading pattern file " + GROK_PATTERN_FILE); + LOG.info("Loading pattern file " + GROK_PATTERN_FILE); try { - BufferedInputStream fileInputStream = (BufferedInputStream) this - .getClass().getClassLoader() - .getResourceAsStream(GROK_PATTERN_FILE); + BufferedInputStream fileInputStream = + (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(GROK_PATTERN_FILE); if (fileInputStream == null) { - logger.fatal("Couldn't load grok-patterns file " - + GROK_PATTERN_FILE + ". Things will not work"); + LOG.fatal("Couldn't load grok-patterns file " + GROK_PATTERN_FILE + ". Things will not work"); return false; } grokPatternsReader = new InputStreamReader(fileInputStream); } catch (Throwable t) { - logger.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE - + " from classpath. Grok filtering will not work.", t); + LOG.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE + " from classpath. Grok filtering will not work.", t); return false; } try { grok.addPatternFromReader(grokPatternsReader); } catch (GrokException e) { - logger.fatal( - "Error loading patterns from grok-patterns reader for file " - + GROK_PATTERN_FILE, e); + LOG.fatal("Error loading patterns from grok-patterns reader for file " + GROK_PATTERN_FILE, e); return false; } @@ -177,8 +169,7 @@ public class FilterGrok extends Filter { String jsonStr = grokMultiline.capture(inputStr); if (!"{}".equals(jsonStr)) { if (strBuff != null) { - Map<String, Object> jsonObj = Collections - .synchronizedMap(new HashMap<String, Object>()); + Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>()); try { applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr); } finally { @@ -192,15 +183,13 @@ public class FilterGrok extends Filter { if (strBuff == null) { strBuff = new StringBuilder(); } else { - strBuff.append('\r'); - strBuff.append('\n'); + strBuff.append("\r\n"); } strBuff.append(inputStr); savedInputMarker = inputMarker; } else { savedInputMarker = inputMarker; - Map<String, Object> jsonObj = Collections - .synchronizedMap(new HashMap<String, Object>()); + Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>()); applyMessage(inputStr, jsonObj, null); } } @@ -216,14 +205,8 @@ public class FilterGrok extends Filter { } } - /** - * @param inputStr - * @param jsonObj - * @throws LogfeederException - */ - private void applyMessage(String inputStr, Map<String, Object> jsonObj, - String multilineJsonStr) throws LogfeederException { - String jsonStr = grokParse(inputStr); + private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogfeederException { + String jsonStr = grokMessage.capture(inputStr); boolean parseError = false; if ("{}".equals(jsonStr)) { @@ -239,8 +222,7 @@ public class FilterGrok extends Filter { if (parseError) { jsonStr = multilineJsonStr; } - Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr, - jsonType); + Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr, jsonType); for (String namedParam : namedParamList) { if (jsonSrc.get(namedParam) != null) { jsonObj.put(namedParam, jsonSrc.get(namedParam)); @@ -260,37 +242,26 @@ public class FilterGrok extends Filter { } } super.apply(jsonObj, savedInputMarker); - statMetric.count++; - } - - public String grokParse(String inputStr) { - String jsonStr = grokMessage.capture(inputStr); - return jsonStr; + statMetric.value++; } private void logParseError(String inputStr) { - grokErrorMetric.count++; - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_PARSEERROR"; + grokErrorMetric.value++; + String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR"; int inputStrLength = inputStr != null ? inputStr.length() : 0; - LogFeederUtil.logErrorMessageByInterval( - LOG_MESSAGE_KEY, - "Error parsing string. length=" + inputStrLength - + ", input=" + input.getShortDescription() - + ". First upto 100 characters=" - + LogFeederUtil.subString(inputStr, 100), null, logger, - Level.WARN); + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStrLength + ", input=" + + input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG, + Level.WARN); } @Override public void flush() { if (strBuff != null) { - Map<String, Object> jsonObj = Collections - .synchronizedMap(new HashMap<String, Object>()); + Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>()); try { applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr); } catch (LogfeederException e) { - logger.error(e.getLocalizedMessage(), e.getCause()); + LOG.error(e.getLocalizedMessage(), e.getCause()); } strBuff = null; savedInputMarker = null; @@ -304,7 +275,7 @@ public class FilterGrok extends Filter { } @Override - public void addMetricsContainers(List<MetricCount> metricsList) { + public void addMetricsContainers(List<MetricData> metricsList) { super.addMetricsContainers(metricsList); metricsList.add(grokErrorMetric); } @@ -314,5 +285,4 @@ public class FilterGrok extends Filter { super.logStat(); logStatForMetric(grokErrorMetric, "Stat: Grok Errors"); } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java index 2954106..ba63c61 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java @@ -22,12 +22,13 @@ import java.util.Map; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; public class FilterJSON extends Filter { - private static final Logger logger = Logger.getLogger(FilterJSON.class); + private static final Logger LOG = Logger.getLogger(FilterJSON.class); @Override public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException { @@ -35,7 +36,7 @@ public class FilterJSON extends Filter { try { jsonMap = LogFeederUtil.toJSONObject(inputStr); } catch (Exception e) { - logger.error(e.getLocalizedMessage()); + LOG.error(e.getLocalizedMessage()); throw new LogfeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause()); } Double lineNumberD = (Double) jsonMap.get("line_number"); @@ -45,10 +46,9 @@ public class FilterJSON extends Filter { } String timeStampStr = (String) jsonMap.get("logtime"); if (timeStampStr != null && !timeStampStr.isEmpty()) { - String logtime = LogFeederUtil.getDate(timeStampStr); + String logtime = DateUtil.getDate(timeStampStr); jsonMap.put("logtime", logtime); } super.apply(jsonMap, inputMarker); } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java index 7adb468..c9c3f2c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -25,38 +25,35 @@ import java.util.StringTokenizer; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class FilterKeyValue extends Filter { - private static final Logger logger = Logger.getLogger(FilterKeyValue.class); + private static final Logger LOG = Logger.getLogger(FilterKeyValue.class); private String sourceField = null; private String valueSplit = "="; private String fieldSplit = "\t"; - private MetricCount errorMetric = new MetricCount(); + private MetricData errorMetric = new MetricData("filter.error.keyvalue", false); @Override public void init() throws Exception { super.init(); - errorMetric.metricsName = "filter.error.keyvalue"; sourceField = getStringValue("source_field"); valueSplit = getStringValue("value_split", valueSplit); fieldSplit = getStringValue("field_split", fieldSplit); - logger.info("init() done. source_field=" + sourceField - + ", value_split=" + valueSplit + ", " + ", field_split=" - + fieldSplit + ", " + getShortDescription()); + LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" + + fieldSplit + ", " + getShortDescription()); if (StringUtils.isEmpty(sourceField)) { - logger.fatal("source_field is not set for filter. This filter will not be applied"); + LOG.fatal("source_field is not set for filter. This filter will not be applied"); return; } - } @Override @@ -71,40 +68,30 @@ public class FilterKeyValue extends Filter { } Object valueObj = jsonObj.get(sourceField); if (valueObj != null) { - StringTokenizer fieldTokenizer = new StringTokenizer( - valueObj.toString(), fieldSplit); + StringTokenizer fieldTokenizer = new StringTokenizer(valueObj.toString(), fieldSplit); while (fieldTokenizer.hasMoreTokens()) { String nv = fieldTokenizer.nextToken(); - StringTokenizer nvTokenizer = new StringTokenizer(nv, - valueSplit); + StringTokenizer nvTokenizer = new StringTokenizer(nv, valueSplit); while (nvTokenizer.hasMoreTokens()) { String name = nvTokenizer.nextToken(); if (nvTokenizer.hasMoreTokens()) { String value = nvTokenizer.nextToken(); jsonObj.put(name, value); } else { - logParseError("name=" + name + ", pair=" + nv - + ", field=" + sourceField + ", field_value=" - + valueObj); + logParseError("name=" + name + ", pair=" + nv + ", field=" + sourceField + ", field_value=" + valueObj); } } } } super.apply(jsonObj, inputMarker); - statMetric.count++; + statMetric.value++; } private void logParseError(String inputStr) { - errorMetric.count++; - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_PARSEERROR"; - LogFeederUtil - .logErrorMessageByInterval( - LOG_MESSAGE_KEY, - "Error parsing string. length=" + inputStr.length() - + ", input=" + input.getShortDescription() - + ". First upto 100 characters=" - + LogFeederUtil.subString(inputStr, 100), null, logger, + errorMetric.value++; + String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStr.length() + ", input=" + + input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG, Level.ERROR); } @@ -114,9 +101,8 @@ public class FilterKeyValue extends Filter { } @Override - public void addMetricsContainers(List<MetricCount> metricsList) { + public void addMetricsContainers(List<MetricData> metricsList) { super.addMetricsContainers(metricsList); metricsList.add(errorMetric); } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java new file mode 100644 index 0000000..41a1fa5 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input; + +import java.io.BufferedReader; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public abstract class AbstractInputFile extends Input { + protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class); + + private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000; + + protected File[] logFiles; + protected String logPath; + protected Object fileKey; + protected String base64FileKey; + + protected boolean isReady; + private boolean isStartFromBegining = true; + + private String checkPointExtension; + private File checkPointFile; + private RandomAccessFile checkPointWriter; + private long lastCheckPointTimeMS; + private int checkPointIntervalMS; + private Map<String, Object> jsonCheckPoint; + private InputMarker lastCheckPointInputMarker; + + @Override + protected String getStatMetricName() { + return "input.files.read_lines"; + } + + @Override + protected String getReadBytesMetricName() { + return "input.files.read_bytes"; + } + + @Override + public void init() throws Exception { + LOG.info("init() called"); + + checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", InputManager.DEFAULT_CHECKPOINT_EXTENSION); + + // Let's close the file and set it to true after we start monitoring it + setClosed(true); + logPath = getStringValue("path"); + tail = getBooleanValue("tail", tail); + checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS); + + if (StringUtils.isEmpty(logPath)) { + LOG.error("path is empty for file input. " + getShortDescription()); + return; + } + + String startPosition = getStringValue("start_position"); + if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") || + startPosition.equalsIgnoreCase("begining") || !tail) { + isStartFromBegining = true; + } + + setFilePath(logPath); + boolean isFileReady = isReady(); + + LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady); + + super.init(); + } + + protected void processFile(File logPathFile) throws FileNotFoundException, IOException { + LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile); + BufferedReader br = null; + checkPointFile = null; + checkPointWriter = null; + jsonCheckPoint = null; + + int lineCount = 0; + try { + setFilePath(logPathFile.getAbsolutePath()); + + br = openLogFile(logPathFile); + + boolean resume = isStartFromBegining; + int resumeFromLineNumber = getResumeFromLineNumber(); + if (resumeFromLineNumber > 0) { + resume = false; + } + + setClosed(false); + int sleepStep = 2; + int sleepIteration = 0; + while (true) { + try { + if (isDrain()) { + break; + } + + String line = br.readLine(); + if (line == null) { + if (!resume) { + resume = true; + } + sleepIteration++; + if (sleepIteration == 2) { + flush(); + if (!tail) { + LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount); + break; + } + } else if (sleepIteration > 4) { + Object newFileKey = getFileKey(logPathFile); + if (newFileKey != null && (fileKey == null || !newFileKey.equals(fileKey))) { + LOG.info("File key is different. Marking this input file for rollover. oldKey=" + fileKey + ", newKey=" + + newFileKey + ". " + getShortDescription()); + + try { + LOG.info("File is rolled over. Closing current open file." + getShortDescription() + ", lineCount=" + + lineCount); + br.close(); + } catch (Exception ex) { + LOG.error("Error closing file" + getShortDescription(), ex); + break; + } + + try { + LOG.info("Opening new rolled over file." + getShortDescription()); + br = openLogFile(logPathFile); + lineCount = 0; + } catch (Exception ex) { + LOG.error("Error opening rolled over file. " + getShortDescription(), ex); + LOG.info("Added input to not ready list." + getShortDescription()); + isReady = false; + inputManager.addToNotReady(this); + break; + } + LOG.info("File is successfully rolled over. " + getShortDescription()); + continue; + } + } + try { + Thread.sleep(sleepStep * 1000); + sleepStep = Math.min(sleepStep * 2, 10); + } catch (InterruptedException e) { + LOG.info("Thread interrupted." + getShortDescription()); + } + } else { + lineCount++; + sleepStep = 1; + sleepIteration = 0; + + if (!resume && lineCount > resumeFromLineNumber) { + LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + getShortDescription()); + resume = true; + } + if (resume) { + InputMarker marker = new InputMarker(this, base64FileKey, lineCount); + outputLine(line, marker); + } + } + } catch (Throwable t) { + String logMessageKey = this.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount + + ", input=" + getShortDescription(), t, LOG, Level.ERROR); + } + } + } finally { + if (br != null) { + LOG.info("Closing reader." + getShortDescription() + ", lineCount=" + lineCount); + try { + br.close(); + } catch (Throwable t) { + // ignore + } + } + } + } + + protected abstract BufferedReader openLogFile(File logFile) throws IOException; + + protected abstract Object getFileKey(File logFile); + + private int getResumeFromLineNumber() { + int resumeFromLineNumber = 0; + + if (tail) { + try { + LOG.info("Checking existing checkpoint file. " + getShortDescription()); + + String checkPointFileName = base64FileKey + checkPointExtension; + File checkPointFolder = inputManager.getCheckPointFolderFile(); + checkPointFile = new File(checkPointFolder, checkPointFileName); + checkPointWriter = new RandomAccessFile(checkPointFile, "rw"); + + try { + int contentSize = checkPointWriter.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointWriter.read(b, 0, contentSize); + if (readSize != contentSize) { + LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + + readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription()); + } else { + String jsonCheckPointStr = new String(b, 0, readSize); + jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); + + resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); + + LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr + + ", resumeFromLineNumber=" + resumeFromLineNumber); + } + } catch (EOFException eofEx) { + LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " + + getShortDescription()); + } + if (jsonCheckPoint == null) { + // This seems to be first time, so creating the initial checkPoint object + jsonCheckPoint = new HashMap<String, Object>(); + jsonCheckPoint.put("file_path", filePath); + jsonCheckPoint.put("file_key", base64FileKey); + } + + } catch (Throwable t) { + LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t); + } + } + + return resumeFromLineNumber; + } + + @Override + public synchronized void checkIn(InputMarker inputMarker) { + if (checkPointWriter != null) { + try { + int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); + if (lineNumber > inputMarker.lineNumber) { + // Already wrote higher line number for this input + return; + } + // If interval is greater than last checkPoint time, then write + long currMS = System.currentTimeMillis(); + if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { + // Let's save this one so we can update the check point file on flush + lastCheckPointInputMarker = inputMarker; + return; + } + lastCheckPointTimeMS = currMS; + + jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber)); + jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); + jsonCheckPoint.put("last_write_time_date", new Date()); + + String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); + + // Let's rewind + checkPointWriter.seek(0); + checkPointWriter.writeInt(jsonStr.length()); + checkPointWriter.write(jsonStr.getBytes()); + + if (isClosed()) { + String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() + + ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO); + } + } catch (Throwable t) { + String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t, + LOG, Level.ERROR); + } + } + } + + @Override + public void lastCheckIn() { + if (lastCheckPointInputMarker != null) { + checkIn(lastCheckPointInputMarker); + } + } + + @Override + public void close() { + super.close(); + LOG.info("close() calling checkPoint checkIn(). " + getShortDescription()); + lastCheckIn(); + } + + @Override + public String getShortDescription() { + return "input:source=" + getStringValue("source") + ", path=" + + (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath); + } +}
