http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java new file mode 100644 index 0000000..445c294 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +import java.io.EOFException; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static java.nio.file.StandardWatchEventKinds.*; + +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputFile; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.log4j.Logger; +import org.apache.solr.common.util.Base64; + +public class InputMgr { + static Logger logger = Logger.getLogger(InputMgr.class); + + List<Input> inputList = new ArrayList<Input>(); + Set<Input> notReadyList = new HashSet<Input>(); + + WatchService folderWatcher = null; + Set<File> foldersToMonitor = new HashSet<File>(); + Map<String, Input> filesToMonitor = new HashMap<String, Input>(); + boolean isDrain = false; + boolean isAnyInputTail = false; + + private String checkPointSubFolderName = "logfeeder_checkpoints"; + File checkPointFolderFile = null; + + MetricCount filesCountMetric = new MetricCount(); + + private String checkPointExtension = ".cp"; + + public List<Input> getInputList() { + return inputList; + } + + public void add(Input input) { + inputList.add(input); + } + + /** + * @param input + */ + public void removeInput(Input input) { + logger.info("Trying to remove from inputList. " + + input.getShortDescription()); + Iterator<Input> iter = inputList.iterator(); + while (iter.hasNext()) { + Input iterInput = iter.next(); + if (iterInput.equals(input)) { + logger.info("Removing Input from inputList. " + + input.getShortDescription()); + iter.remove(); + } + } + } + + /** + * @return + */ + public int getActiveFilesCount() { + int count = 0; + for (Input input : inputList) { + if (input.isReady()) { + count++; + } + } + return count; + } + + public void init() { + filesCountMetric.metricsName = "input.files.count"; + filesCountMetric.isPointInTime = true; + + checkPointExtension = LogFeederUtil.getStringProperty( + "logfeeder.checkpoint.extension", checkPointExtension); + for (Input input : inputList) { + try { + input.init(); + if (input.isTail()) { + isAnyInputTail = true; + } + } catch (Exception e) { + logger.error( + "Error initializing input. " + + input.getShortDescription(), e); + } + } + + if (isAnyInputTail) { + logger.info("Determining valid checkpoint folder"); + boolean isCheckPointFolderValid = false; + // We need to keep track of the files we are reading. + String checkPointFolder = LogFeederUtil + .getStringProperty("logfeeder.checkpoint.folder"); + if (checkPointFolder != null && !checkPointFolder.isEmpty()) { + checkPointFolderFile = new File(checkPointFolder); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + } + if (!isCheckPointFolderValid) { + // Let's try home folder + String userHome = LogFeederUtil.getStringProperty("user.home"); + if (userHome != null) { + checkPointFolderFile = new File(userHome, + checkPointSubFolderName); + logger.info("Checking if home folder can be used for checkpoints. Folder=" + + checkPointFolderFile); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + } + } + if (!isCheckPointFolderValid) { + // Let's use tmp folder + String tmpFolder = LogFeederUtil + .getStringProperty("java.io.tmpdir"); + if (tmpFolder == null) { + tmpFolder = "/tmp"; + } + checkPointFolderFile = new File(tmpFolder, + checkPointSubFolderName); + logger.info("Checking if tmps folder can be used for checkpoints. Folder=" + + checkPointFolderFile); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + if (isCheckPointFolderValid) { + logger.warn("Using tmp folder " + + checkPointFolderFile + + " to store check points. This is not recommended." + + "Please set logfeeder.checkpoint.folder property"); + } + } + + if (isCheckPointFolderValid) { + logger.warn("Using folder " + checkPointFolderFile + + " for storing checkpoints"); + } + } + + } + + public File getCheckPointFolderFile() { + return checkPointFolderFile; + } + + boolean verifyCheckPointFolder(File folderPathFile) { + if (!folderPathFile.exists()) { + // Create the folder + try { + if (!folderPathFile.mkdir()) { + logger.warn("Error creating folder for check point. folder=" + + folderPathFile); + } + } catch (Throwable t) { + logger.warn("Error creating folder for check point. folder=" + + folderPathFile, t); + } + } + + if (folderPathFile.exists() && folderPathFile.isDirectory()) { + // Let's check whether we can create a file + File testFile = new File(folderPathFile, UUID.randomUUID() + .toString()); + try { + testFile.createNewFile(); + return testFile.delete(); + } catch (IOException e) { + logger.warn( + "Couldn't create test file in " + + folderPathFile.getAbsolutePath() + + " for checkPoint", e); + } + } + return false; + } + + public void monitor() { + for (Input input : inputList) { + if (input.isReady()) { + input.monitor(); + } else { + if (input.isTail()) { + logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. " + + input.getShortDescription()); + notReadyList.add(input); + } else { + logger.info("Input is not ready, so going to ignore it " + + input.getShortDescription()); + } + } + } + // Start the monitoring thread if any file is in tail mode + if (isAnyInputTail) { + Thread monitorThread = new Thread("InputIsReadyMonitor") { + @Override + public void run() { + logger.info("Going to monitor for these missing files: " + + notReadyList.toString()); + while (true) { + if (isDrain) { + logger.info("Exiting missing file monitor."); + break; + } + try { + Iterator<Input> iter = notReadyList.iterator(); + while (iter.hasNext()) { + Input input = iter.next(); + try { + if (input.isReady()) { + input.monitor(); + iter.remove(); + } + } catch (Throwable t) { + logger.error("Error while enabling monitoring for input. " + + input.getShortDescription()); + } + } + Thread.sleep(30 * 1000); + } catch (Throwable t) { + // Ignore + } + } + } + }; + monitorThread.start(); + } + } + + public void addToNotReady(Input notReadyInput) { + notReadyList.add(notReadyInput); + } + + public void addMetricsContainers(List<MetricCount> metricsList) { + for (Input input : inputList) { + input.addMetricsContainers(metricsList); + } + filesCountMetric.count = getActiveFilesCount(); + metricsList.add(filesCountMetric); + } + + /** + * + */ + public void logStats() { + for (Input input : inputList) { + input.logStat(); + } + + filesCountMetric.count = getActiveFilesCount(); + LogFeederUtil.logStatForMetric(filesCountMetric, + "Stat: Files Monitored Count", null); + } + + public void close() { + for (Input input : inputList) { + try { + input.setDrain(true); + } catch (Throwable t) { + logger.error( + "Error while draining. input=" + + input.getShortDescription(), t); + } + } + isDrain = true; + + // Need to get this value from property + int iterations = 30; + int waitTimeMS = 1000; + int i = 0; + boolean allClosed = true; + for (i = 0; i < iterations; i++) { + allClosed = true; + for (Input input : inputList) { + if (!input.isClosed()) { + try { + allClosed = false; + logger.warn("Waiting for input to close. " + + input.getShortDescription() + ", " + + (iterations - i) + " more seconds"); + Thread.sleep(waitTimeMS); + } catch (Throwable t) { + // Ignore + } + } + } + if (allClosed) { + break; + } + } + if (!allClosed) { + logger.warn("Some inputs were not closed. Iterations=" + i); + for (Input input : inputList) { + if (!input.isClosed()) { + logger.warn("Input not closed. Will ignore it." + + input.getShortDescription()); + } + } + } else { + logger.info("All inputs are closed. Iterations=" + i); + } + + } + + public void checkInAll() { + for (Input input : inputList) { + input.checkIn(); + } + } + + public void cleanCheckPointFiles() { + + if (checkPointFolderFile == null) { + logger.info("Will not clean checkPoint files. checkPointFolderFile=" + + checkPointFolderFile); + return; + } + logger.info("Cleaning checkPoint files. checkPointFolderFile=" + + checkPointFolderFile.getAbsolutePath()); + try { + // Loop over the check point files and if filePath is not present, + // then + // move to closed + String searchPath = "*" + checkPointExtension; + FileFilter fileFilter = new WildcardFileFilter(searchPath); + File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter); + int totalCheckFilesDeleted = 0; + for (File checkPointFile : checkPointFiles) { + RandomAccessFile checkPointReader = null; + try { + checkPointReader = new RandomAccessFile(checkPointFile, "r"); + + int contentSize = checkPointReader.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointReader.read(b, 0, contentSize); + if (readSize != contentSize) { + logger.error("Couldn't read expected number of bytes from checkpoint file. expected=" + + contentSize + + ", read=" + + readSize + + ", checkPointFile=" + checkPointFile); + } else { + // Create JSON string + String jsonCheckPointStr = new String(b, 0, readSize); + Map<String, Object> jsonCheckPoint = LogFeederUtil + .toJSONObject(jsonCheckPointStr); + + String logFilePath = (String) jsonCheckPoint + .get("file_path"); + String logFileKey = (String) jsonCheckPoint + .get("file_key"); + if (logFilePath != null && logFileKey != null) { + boolean deleteCheckPointFile = false; + File logFile = new File(logFilePath); + if (logFile.exists()) { + Object fileKeyObj = InputFile + .getFileKey(logFile); + String fileBase64 = Base64 + .byteArrayToBase64(fileKeyObj + .toString().getBytes()); + if (!logFileKey.equals(fileBase64)) { + deleteCheckPointFile = true; + logger.info("CheckPoint clean: File key has changed. old=" + + logFileKey + + ", new=" + + fileBase64 + + ", filePath=" + + logFilePath + + ", checkPointFile=" + + checkPointFile.getAbsolutePath()); + } + } else { + logger.info("CheckPoint clean: Log file doesn't exist. filePath=" + + logFilePath + + ", checkPointFile=" + + checkPointFile.getAbsolutePath()); + deleteCheckPointFile = true; + } + if (deleteCheckPointFile) { + logger.info("Deleting CheckPoint file=" + + checkPointFile.getAbsolutePath() + + ", logFile=" + logFilePath); + checkPointFile.delete(); + totalCheckFilesDeleted++; + } + } + } + } catch (EOFException eof) { + logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + + checkPointFile); + } catch (Throwable t) { + logger.error("Error while checking checkPoint file. " + + checkPointFile, t); + } finally { + if (checkPointReader != null) { + try { + checkPointReader.close(); + } catch (Throwable t) { + logger.error("Error closing checkPoint file. " + + checkPointFile, t); + } + } + } + } + logger.info("Deleted " + totalCheckFilesDeleted + + " checkPoint file(s). checkPointFolderFile=" + + checkPointFolderFile.getAbsolutePath()); + + } catch (Throwable t) { + logger.error("Error while cleaning checkPointFiles", t); + } + } + + synchronized public void monitorSystemFileChanges(Input inputToMonitor) { + try { + File fileToMonitor = new File(inputToMonitor.getFilePath()); + if (filesToMonitor.containsKey(fileToMonitor.getAbsolutePath())) { + logger.info("Already monitoring file " + fileToMonitor + + ". So ignoring this request"); + return; + } + + // make a new watch service that we can register interest in + // directories and files with. + if (folderWatcher == null) { + folderWatcher = FileSystems.getDefault().newWatchService(); + // start the file watcher thread below + Thread th = new Thread(new FileSystemMonitor(), + "FileSystemWatcher"); + th.setDaemon(true); + th.start(); + + } + File folderToWatch = fileToMonitor.getParentFile(); + if (folderToWatch != null) { + if (foldersToMonitor.contains(folderToWatch.getAbsolutePath())) { + logger.info("Already monitoring folder " + folderToWatch + + ". So ignoring this request."); + } else { + logger.info("Configuring to monitor folder " + + folderToWatch + " for file " + fileToMonitor); + // get the directory we want to watch, using the Paths + // singleton + // class + Path toWatch = Paths.get(folderToWatch.getAbsolutePath()); + if (toWatch == null) { + throw new UnsupportedOperationException( + "Directory not found. folder=" + folderToWatch); + } + + toWatch.register(folderWatcher, ENTRY_CREATE); + foldersToMonitor.add(folderToWatch); + } + filesToMonitor.put(fileToMonitor.getAbsolutePath(), + inputToMonitor); + } else { + logger.error("File doesn't have parent folder." + fileToMonitor); + } + } catch (IOException e) { + logger.error("Error while trying to set watcher for file:" + + inputToMonitor); + } + + } + + class FileSystemMonitor implements Runnable { + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + try { + // get the first event before looping + WatchKey key = folderWatcher.take(); + while (key != null) { + Path dir = (Path) key.watchable(); + // we have a polled event, now we traverse it and + // receive all the states from it + for (WatchEvent<?> event : key.pollEvents()) { + if (!event.kind().equals(ENTRY_CREATE)) { + logger.info("Ignoring event.kind=" + event.kind()); + continue; + } + logger.info("Received " + event.kind() + + " event for file " + event.context()); + + File newFile = new File(dir.toFile(), event.context() + .toString()); + Input rolledOverInput = filesToMonitor.get(newFile + .getAbsolutePath()); + if (rolledOverInput == null) { + logger.info("Input not found for file " + newFile); + } else { + rolledOverInput.rollOver(); + } + } + if (!key.reset()) { + logger.error("Error while key.reset(). Will have to abort watching files. Rollover will not work."); + break; + } + key = folderWatcher.take(); + } + } catch (InterruptedException e) { + logger.info("Stop request for thread"); + } + logger.info("Exiting FileSystemMonitor thread."); + } + + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java new file mode 100644 index 0000000..f4ca51b --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM; +import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE; +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.google.gson.reflect.TypeToken; + +public class LogFeeder { + static Logger logger = Logger.getLogger(LogFeeder.class); + + // List<Input> inputList = new ArrayList<Input>(); + Collection<Output> outputList = new ArrayList<Output>(); + + OutputMgr outMgr = new OutputMgr(); + InputMgr inputMgr = new InputMgr(); + MetricsMgr metricsMgr = new MetricsMgr(); + + Map<String, Object> globalMap = null; + String[] inputParams; + + List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>(); + List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>(); + List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>(); + List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>(); + + int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours + long lastCheckPointCleanedMS = 0; + + public LogFeeder(String[] args) { + inputParams = args; + } + + public void init() throws Throwable { + + // Load properties + LogFeederUtil.loadProperties("logfeeder.properties", inputParams); + + // loop the properties and load them + // Load the configs + String configFiles = LogFeederUtil.getStringProperty("config.files"); + if (configFiles == null) { + configFiles = LogFeederUtil.getStringProperty("config.file", + "config.json"); + } + logger.info("config.files=" + configFiles); + String[] configFileList = configFiles.split(","); + for (String configFileName : configFileList) { + logger.info("Going to load config file:" + configFileName); + File configFile = new File(configFileName); + if (configFile.exists() && configFile.isFile()) { + logger.info("Config file exists in path." + + configFile.getAbsolutePath()); + loadConfigsUsingFile(configFile); + } else { + // Let's try to load it from class loader + logger.info("Trying to load config file from classloader: " + + configFileName); + laodConfigsUsingClassLoader(configFileName); + logger.info("Loaded config file from classloader: " + + configFileName); + } + } + mergeAllConfigs(); + outMgr.setOutputList(outputList); + for (Output output : outputList) { + output.init(); + } + inputMgr.init(); + metricsMgr.init(); + //starting timer to fetch config from solr + LogfeederScheduler.INSTANCE.start(); + logger.debug("=============="); + } + + void laodConfigsUsingClassLoader(String configFileName) throws Exception { + BufferedInputStream fileInputStream = (BufferedInputStream) this + .getClass().getClassLoader() + .getResourceAsStream(configFileName); + if (fileInputStream != null) { + BufferedReader br = new BufferedReader(new InputStreamReader( + fileInputStream)); + String configData = readFile(br); + loadConfigs(configData); + } else { + throw new Exception("Can't find configFile=" + configFileName); + } + } + + /** + * This method loads the configurations from the given file. + * + * @param configFile + * @return + * @throws Exception + */ + void loadConfigsUsingFile(File configFile) throws Exception { + FileInputStream fileInputStream = null; + try { + fileInputStream = new FileInputStream(configFile); + BufferedReader br = new BufferedReader(new InputStreamReader( + fileInputStream)); + String configData = readFile(br); + loadConfigs(configData); + } catch (Exception t) { + logger.error("Error opening config file. configFilePath=" + + configFile.getAbsolutePath()); + throw t; + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (Throwable t) { + // ignore + } + } + } + } + + @SuppressWarnings("unchecked") + void loadConfigs(String configData) throws Exception { + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + Map<String, Object> configMap = LogFeederUtil.getGson().fromJson( + configData, type); + + // Get the globals + for (String key : configMap.keySet()) { + if (key.equalsIgnoreCase("global")) { + globalConfigList.add((Map<String, Object>) configMap.get(key)); + } else if (key.equalsIgnoreCase("input")) { + List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap + .get(key); + inputConfigList.addAll(mapList); + } else if (key.equalsIgnoreCase("filter")) { + List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap + .get(key); + filterConfigList.addAll(mapList); + } else if (key.equalsIgnoreCase("output")) { + List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap + .get(key); + outputConfigList.addAll(mapList); + } + } + + } + + /** + * + */ + private void mergeAllConfigs() { + globalMap = mergeConfigs(globalConfigList); + + // Sort the filter blocks + sortBlocks(filterConfigList); + // First loop for output + for (Map<String, Object> map : outputConfigList) { + if (map == null) { + continue; + } + mergeBlocks(globalMap, map); + + String value = (String) map.get("destination"); + Output output; + if (value == null || value.isEmpty()) { + logger.error("Output block doesn't have destination element"); + continue; + } + String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS); + if (classFullName == null || classFullName.isEmpty()) { + logger.error("Destination block doesn't have output element"); + continue; + } + output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT); + + if (output == null) { + logger.error("Destination Object is null"); + continue; + } + + output.setDestination(value); + output.loadConfig(map); + + // We will only check for is_enabled out here. Down below we will + // check whether this output is enabled for the input + boolean isEnabled = output.getBooleanValue("is_enabled", true); + if (isEnabled) { + outputList.add(output); + output.logConfgs(Level.INFO); + } else { + logger.info("Output is disabled. So ignoring it. " + + output.getShortDescription()); + } + } + + // Second loop for input + for (Map<String, Object> map : inputConfigList) { + if (map == null) { + continue; + } + mergeBlocks(globalMap, map); + + String value = (String) map.get("source"); + Input input; + if (value == null || value.isEmpty()) { + logger.error("Input block doesn't have source element"); + continue; + } + String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS); + if (classFullName == null || classFullName.isEmpty()) { + logger.error("Source block doesn't have source element"); + continue; + } + input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT); + + if (input == null) { + logger.error("Source Object is null"); + continue; + } + + input.setType(value); + input.loadConfig(map); + + if (input.isEnabled()) { + input.setOutputMgr(outMgr); + input.setInputMgr(inputMgr); + inputMgr.add(input); + input.logConfgs(Level.INFO); + } else { + logger.info("Input is disabled. So ignoring it. " + + input.getShortDescription()); + } + } + + // Third loop is for filter, but we will have to create a filter + // instance for each input, so it can maintain the state per input + List<Input> toRemoveInputList = new ArrayList<Input>(); + for (Input input : inputMgr.getInputList()) { + Filter prevFilter = null; + for (Map<String, Object> map : filterConfigList) { + if (map == null) { + continue; + } + mergeBlocks(globalMap, map); + + String value = (String) map.get("filter"); + Filter filter; + if (value == null || value.isEmpty()) { + logger.error("Filter block doesn't have filter element"); + continue; + } + + String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS); + if (classFullName == null || classFullName.isEmpty()) { + logger.error("Filter block doesn't have filter element"); + continue; + } + filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER); + + if (filter == null) { + logger.error("Filter Object is null"); + continue; + } + filter.loadConfig(map); + filter.setInput(input); + + if (filter.isEnabled()) { + filter.setOutputMgr(outMgr); + if (prevFilter == null) { + input.setFirstFilter(filter); + } else { + prevFilter.setNextFilter(filter); + } + prevFilter = filter; + filter.logConfgs(Level.INFO); + } else { + logger.debug("Ignoring filter " + + filter.getShortDescription() + " for input " + + input.getShortDescription()); + } + } + if (input.getFirstFilter() == null) { + toRemoveInputList.add(input); + } + } + + // Fourth loop is for associating valid outputs to input + Set<Output> usedOutputSet = new HashSet<Output>(); + for (Input input : inputMgr.getInputList()) { + for (Output output : outputList) { + boolean ret = LogFeederUtil.isEnabled(output.getConfigs(), + input.getConfigs()); + if (ret) { + usedOutputSet.add(output); + input.addOutput(output); + } + } + } + outputList = usedOutputSet; + + for (Input toRemoveInput : toRemoveInputList) { + logger.warn("There are no filters, we will ignore this input. " + + toRemoveInput.getShortDescription()); + inputMgr.removeInput(toRemoveInput); + } + } + + /** + * @param filterConfigList2 + * @return + */ + private void sortBlocks(List<Map<String, Object>> blockList) { + + Collections.sort(blockList, new Comparator<Map<String, Object>>() { + + @Override + public int compare(Map<String, Object> o1, Map<String, Object> o2) { + Object o1Sort = o1.get("sort_order"); + Object o2Sort = o2.get("sort_order"); + if (o1Sort == null) { + return 0; + } + if (o2Sort == null) { + return 0; + } + int o1Value = 0; + if (!(o1Sort instanceof Number)) { + try { + o1Value = (new Double(Double.parseDouble(o1Sort + .toString()))).intValue(); + } catch (Throwable t) { + logger.error("Value is not of type Number. class=" + + o1Sort.getClass().getName() + ", value=" + + o1Sort.toString() + ", map=" + o1.toString()); + } + } else { + o1Value = ((Number) o1Sort).intValue(); + } + int o2Value = 0; + if (!(o2Sort instanceof Integer)) { + try { + o2Value = (new Double(Double.parseDouble(o2Sort + .toString()))).intValue(); + } catch (Throwable t) { + logger.error("Value is not of type Number. class=" + + o2Sort.getClass().getName() + ", value=" + + o2Sort.toString() + ", map=" + o2.toString()); + } + } else { + + } + return o1Value - o2Value; + } + }); + } + + /** + * @param globalConfigList2 + */ + private Map<String, Object> mergeConfigs( + List<Map<String, Object>> configList) { + Map<String, Object> mergedConfig = new HashMap<String, Object>(); + for (Map<String, Object> config : configList) { + mergeBlocks(config, mergedConfig); + } + return mergedConfig; + } + + private void mergeBlocks(Map<String, Object> fromMap, + Map<String, Object> toMap) { + // Merge the non-string + for (String key : fromMap.keySet()) { + Object objValue = fromMap.get(key); + if (objValue == null) { + continue; + } + if (objValue instanceof Map) { + @SuppressWarnings("unchecked") + Map<String, Object> globalFields = LogFeederUtil + .cloneObject((Map<String, Object>) fromMap.get(key)); + + @SuppressWarnings("unchecked") + Map<String, Object> localFields = (Map<String, Object>) toMap + .get(key); + if (localFields == null) { + localFields = new HashMap<String, Object>(); + toMap.put(key, localFields); + } + + if (globalFields != null) { + for (String fieldKey : globalFields.keySet()) { + if (!localFields.containsKey(fieldKey)) { + localFields.put(fieldKey, + globalFields.get(fieldKey)); + } + } + } + } + } + + // Let's add the rest of the top level fields if missing + for (String key : fromMap.keySet()) { + if (!toMap.containsKey(key)) { + toMap.put(key, fromMap.get(key)); + } + } + } + + private void monitor() throws Exception { + inputMgr.monitor(); + Runtime.getRuntime().addShutdownHook(new JVMShutdownHook()); + + Thread statLogger = new Thread("statLogger") { + + @Override + public void run() { + while (true) { + try { + Thread.sleep(30 * 1000); + } catch (Throwable t) { + // Ignore + } + try { + logStats(); + } catch (Throwable t) { + logger.error( + "LogStats: Caught exception while logging stats.", + t); + } + + if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) { + lastCheckPointCleanedMS = System.currentTimeMillis(); + inputMgr.cleanCheckPointFiles(); + } + } + } + + }; + statLogger.setDaemon(true); + statLogger.start(); + + } + + private void logStats() { + inputMgr.logStats(); + outMgr.logStats(); + + if (metricsMgr.isMetricsEnabled()) { + List<MetricCount> metricsList = new ArrayList<MetricCount>(); + inputMgr.addMetricsContainers(metricsList); + outMgr.addMetricsContainers(metricsList); + metricsMgr.useMetrics(metricsList); + } + } + + /** + * @param inFile + * @return + * @throws Throwable + */ + public String readFile(BufferedReader br) throws Exception { + try { + StringBuilder sb = new StringBuilder(); + String line = br.readLine(); + while (line != null) { + sb.append(line); + line = br.readLine(); + } + return sb.toString(); + } catch (Exception t) { + logger.error("Error loading properties file.", t); + throw t; + } + } + + public Collection<Output> getOutputList() { + return outputList; + } + + public OutputMgr getOutMgr() { + return outMgr; + } + + public static void main(String[] args) { + LogFeeder logFeeder = new LogFeeder(args); + logFeeder.run(logFeeder); + } + + + public static void run(String[] args) { + LogFeeder logFeeder = new LogFeeder(args); + logFeeder.run(logFeeder); + } + + public void run(LogFeeder logFeeder) { + try { + Date startTime = new Date(); + logFeeder.init(); + Date endTime = new Date(); + logger.info("Took " + (endTime.getTime() - startTime.getTime()) + + " ms to initialize"); + logFeeder.monitor(); + + } catch (Throwable t) { + logger.fatal("Caught exception in main.", t); + System.exit(1); + } + } + + private class JVMShutdownHook extends Thread { + + public void run() { + try { + logger.info("Processing is shutting down."); + + inputMgr.close(); + outMgr.close(); + inputMgr.checkInAll(); + + logStats(); + + logger.info("LogSearch is exiting."); + } catch (Throwable t) { + // Ignore + } + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java new file mode 100644 index 0000000..e53a227 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.log4j.Logger; + +public class LogFeederAMSClient extends AbstractTimelineMetricsSink { + static Logger logger = Logger.getLogger(LogFeederAMSClient.class); + + String collectorHosts = null; + + public LogFeederAMSClient() { + collectorHosts = LogFeederUtil + .getStringProperty("metrics.collector.hosts"); + if (collectorHosts != null && collectorHosts.trim().length() == 0) { + collectorHosts = null; + } + if (collectorHosts != null) { + collectorHosts = collectorHosts.trim(); + } + logger.info("AMS collector URL=" + collectorHosts); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink# + * getCollectorUri() + */ + @Override + public String getCollectorUri() { + + return collectorHosts; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink# + * getTimeoutSeconds() + */ + @Override + protected int getTimeoutSeconds() { + // TODO: Hard coded timeout + return 10; + } + + @Override + protected void emitMetrics(TimelineMetrics metrics) { + super.emitMetrics(metrics); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java new file mode 100644 index 0000000..7303694 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.URL; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.mapper.Mapper; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +/** + * This class contains utility methods used by LogFeeder + */ +public class LogFeederUtil { + static Logger logger = Logger.getLogger(LogFeederUtil.class); + + final static int HASH_SEED = 31174077; + public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; + static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); + + static Properties props; + + private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>(); + private static int logInterval = 30000; // 30 seconds + + public static Gson getGson() { + return gson; + } + + /** + * This method will read the properties from System, followed by propFile + * and finally from the map + * + * @param propFile + * @param propNVList + * @throws Exception + */ + static public void loadProperties(String propFile, String[] propNVList) + throws Exception { + logger.info("Loading properties. propFile=" + propFile); + props = new Properties(System.getProperties()); + boolean propLoaded = false; + + // First get properties file path from environment value + String propertiesFilePath = System.getProperty("properties"); + if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) { + File propertiesFile = new File(propertiesFilePath); + if (propertiesFile.exists() && propertiesFile.isFile()) { + logger.info("Properties file path set in environment. Loading properties file=" + + propertiesFilePath); + FileInputStream fileInputStream = null; + try { + fileInputStream = new FileInputStream(propertiesFile); + props.load(fileInputStream); + propLoaded = true; + } catch (Throwable t) { + logger.error("Error loading properties file. properties file=" + + propertiesFile.getAbsolutePath()); + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (Throwable t) { + // Ignore error + } + } + } + } else { + logger.error("Properties file path set in environment, but file not found. properties file=" + + propertiesFilePath); + } + } + + if (!propLoaded) { + // Properties not yet loaded, let's try from class loader + BufferedInputStream fileInputStream = (BufferedInputStream) LogFeeder.class + .getClassLoader().getResourceAsStream(propFile); + if (fileInputStream != null) { + logger.info("Loading properties file " + propFile + + " from classpath"); + props.load(fileInputStream); + propLoaded = true; + } else { + logger.fatal("Properties file not found in classpath. properties file name= " + + propFile); + } + } + + if (!propLoaded) { + logger.fatal("Properties file is not loaded."); + throw new Exception("Properties not loaded"); + } else { + // Let's load properties from argument list + updatePropertiesFromMap(propNVList); + } + } + + /** + * @param nvList + */ + private static void updatePropertiesFromMap(String[] nvList) { + if (nvList == null) { + return; + } + logger.info("Trying to load additional proeprties from argument paramters. nvList.length=" + + nvList.length); + if (nvList != null && nvList.length > 0) { + for (String nv : nvList) { + logger.info("Passed nv=" + nv); + if (nv.startsWith("-") && nv.length() > 1) { + nv = nv.substring(1); + logger.info("Stripped nv=" + nv); + int i = nv.indexOf("="); + if (nv.length() > i) { + logger.info("Candidate nv=" + nv); + String name = nv.substring(0, i); + String value = nv.substring(i + 1); + logger.info("Adding property from argument to properties. name=" + + name + ", value=" + value); + props.put(name, value); + } + } + } + } + } + + static public String getStringProperty(String key) { + if (props != null) { + return props.getProperty(key); + } + return null; + } + + static public String getStringProperty(String key, String defaultValue) { + if (props != null) { + return props.getProperty(key, defaultValue); + } + return defaultValue; + } + + static public boolean getBooleanProperty(String key, boolean defaultValue) { + String strValue = getStringProperty(key); + return toBoolean(strValue, defaultValue); + } + + private static boolean toBoolean(String strValue, boolean defaultValue) { + boolean retValue = defaultValue; + if (!StringUtils.isEmpty(strValue)) { + if (strValue.equalsIgnoreCase("true") + || strValue.equalsIgnoreCase("yes")) { + retValue = true; + } else { + retValue = false; + } + } + return retValue; + } + + static public int getIntProperty(String key, int defaultValue) { + String strValue = getStringProperty(key); + int retValue = defaultValue; + retValue = objectToInt(strValue, retValue, ", key=" + key); + return retValue; + } + + public static int objectToInt(Object objValue, int retValue, + String errMessage) { + if (objValue == null) { + return retValue; + } + String strValue = objValue.toString(); + if (!StringUtils.isEmpty(strValue)) { + try { + retValue = Integer.parseInt(strValue); + } catch (Throwable t) { + logger.error("Error parsing integer value. str=" + strValue + + ", " + errMessage); + } + } + return retValue; + } + + static public boolean isEnabled(Map<String, Object> configs) { + return isEnabled(configs, configs); + } + + static public boolean isEnabled(Map<String, Object> conditionConfigs, + Map<String, Object> valueConfigs) { + boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true); + @SuppressWarnings("unchecked") + Map<String, Object> conditions = (Map<String, Object>) conditionConfigs + .get("conditions"); + if (conditions != null && conditions.size() > 0) { + allow = false; + for (String conditionType : conditions.keySet()) { + if (conditionType.equalsIgnoreCase("fields")) { + @SuppressWarnings("unchecked") + Map<String, Object> fields = (Map<String, Object>) conditions + .get("fields"); + for (String fieldName : fields.keySet()) { + Object values = fields.get(fieldName); + if (values instanceof String) { + allow = isFieldConditionMatch(valueConfigs, + fieldName, (String) values); + } else { + @SuppressWarnings("unchecked") + List<String> listValues = (List<String>) values; + for (String stringValue : listValues) { + allow = isFieldConditionMatch(valueConfigs, + fieldName, stringValue); + if (allow) { + break; + } + } + } + if (allow) { + break; + } + } + } + if (allow) { + break; + } + } + } + return allow; + } + + static public boolean isFieldConditionMatch(Map<String, Object> configs, + String fieldName, String stringValue) { + boolean allow = false; + String fieldValue = (String) configs.get(fieldName); + if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) { + allow = true; + } else { + @SuppressWarnings("unchecked") + Map<String, Object> addFields = (Map<String, Object>) configs + .get("add_fields"); + if (addFields != null && addFields.get(fieldName) != null) { + String addFieldValue = (String) addFields.get(fieldName); + if (stringValue.equalsIgnoreCase(addFieldValue)) { + allow = true; + } + } + + } + return allow; + } + + static public void logStatForMetric(MetricCount metric, String prefixStr, + String postFix) { + long currStat = metric.count; + long currMS = System.currentTimeMillis(); + if (currStat > metric.prevLogCount) { + if (postFix == null) { + postFix = ""; + } + logger.info(prefixStr + ": total_count=" + metric.count + + ", duration=" + (currMS - metric.prevLogMS) / 1000 + + " secs, count=" + (currStat - metric.prevLogCount) + + postFix); + } + metric.prevLogCount = currStat; + metric.prevLogMS = currMS; + } + + static public void logCountForMetric(MetricCount metric, String prefixStr, + String postFix) { + logger.info(prefixStr + ": count=" + metric.count + postFix); + } + + public static Map<String, Object> cloneObject(Map<String, Object> map) { + if (map == null) { + return null; + } + String jsonStr = gson.toJson(map); + // We need to clone it, so we will create a JSON string and convert it + // back + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + return gson.fromJson(jsonStr, type); + } + + public static Map<String, Object> toJSONObject(String jsonStr) { + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + return gson.fromJson(jsonStr, type); + } + + static public boolean logErrorMessageByInterval(String key, String message, + Throwable e, Logger callerLogger, Level level) { + + LogHistory log = logHistoryList.get(key); + if (log == null) { + log = new LogHistory(); + logHistoryList.put(key, log); + } + if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) { + log.lastLogTime = System.currentTimeMillis(); + int counter = log.counter; + log.counter = 0; + if (counter > 0) { + message += ". Messages suppressed before: " + counter; + } + if (e == null) { + callerLogger.log(level, message); + } else { + callerLogger.log(level, message, e); + } + + return true; + } else { + log.counter++; + } + return false; + + } + + static public String subString(String str, int maxLength) { + if (str == null || str.length() == 0) { + return ""; + } + maxLength = str.length() < maxLength ? str.length() : maxLength; + return str.substring(0, maxLength); + } + + static public long genHash(String value) { + if (value == null) { + value = "null"; + } + return MurmurHash.hash64A(value.getBytes(), HASH_SEED); + } + + static class LogHistory { + long lastLogTime = 0; + int counter = 0; + } + + public static String getDate(String timeStampStr) { + try { + DateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + Date netDate = (new Date(Long.parseLong(timeStampStr))); + return sdf.format(netDate); + } catch (Exception ex) { + return null; + } + } + + public static File getFileFromClasspath(String filename) { + URL fileCompleteUrl = Thread.currentThread().getContextClassLoader() + .getResource(filename); + logger.debug("File Complete URI :" + fileCompleteUrl); + File file = null; + try { + file = new File(fileCompleteUrl.toURI()); + } catch (Exception exception) { + logger.debug(exception.getMessage(), exception.getCause()); + } + return file; + } + + public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) { + Object instance = null; + try { + instance = (Object) Class.forName(classFullName).getConstructor().newInstance(); + } catch (Exception exception) { + logger.error("Unsupported class =" + classFullName, exception.getCause()); + } + // check instance class as par aliasType + if (instance != null) { + boolean isValid = false; + switch (aliasType) { + case FILTER: + isValid = Filter.class.isAssignableFrom(instance.getClass()); + break; + case INPUT: + isValid = Input.class.isAssignableFrom(instance.getClass()); + break; + case OUTPUT: + isValid = Output.class.isAssignableFrom(instance.getClass()); + break; + case MAPPER: + isValid = Mapper.class.isAssignableFrom(instance.getClass()); + break; + default: + // by default consider all are valid class + isValid = true; + } + if (!isValid) { + logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name()); + } + } + return instance; + } + + /** + * @param fileName + * @return + */ + public static HashMap<String, Object> readJsonFromFile(File jsonFile) { + ObjectMapper mapper = new ObjectMapper(); + try { + HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() { + }); + return jsonmap; + } catch (JsonParseException e) { + logger.error(e, e.getCause()); + } catch (JsonMappingException e) { + logger.error(e, e.getCause()); + } catch (IOException e) { + logger.error(e, e.getCause()); + } + return new HashMap<String, Object>(); + } + + public static boolean isListContains(List<String> list, String str, boolean caseSensitive) { + if (list != null) { + for (String value : list) { + if (value != null) { + if (caseSensitive) { + if (value.equals(str)) { + return true; + } + } else { + if (value.equalsIgnoreCase(str)) { + return true; + } + } + if (value.equalsIgnoreCase("ALL")) { + return true; + } + } + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java new file mode 100644 index 0000000..c715881 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +public class MetricCount { + public String metricsName = null; + public boolean isPointInTime = false; + + public long count = 0; + public long prevLogCount = 0; + public long prevLogMS = System.currentTimeMillis(); + public long prevPublishCount = 0; + public long prevPublishMS = 0; // We will try to publish one immediately + public int publishCount = 0; // Count of published metrics. Used for first + // time sending metrics +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java new file mode 100644 index 0000000..2152d14 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.log4j.Logger; + +public class MetricsMgr { + static Logger logger = Logger.getLogger(MetricsMgr.class); + + boolean isMetricsEnabled = false; + String nodeHostName = null; + String appId = "logfeeder"; + + long lastPublishTimeMS = 0; // Let's do the first publish immediately + long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the + // clock + + int publishIntervalMS = 60 * 1000; + int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep + // the metrics in memory forever + HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>(); + LogFeederAMSClient amsClient = null; + + public void init() { + logger.info("Initializing MetricsMgr()"); + amsClient = new LogFeederAMSClient(); + + if (amsClient.getCollectorUri() != null) { + nodeHostName = LogFeederUtil.getStringProperty("node.hostname"); + if (nodeHostName == null) { + try { + nodeHostName = InetAddress.getLocalHost().getHostName(); + } catch (Throwable e) { + logger.warn( + "Error getting hostname using InetAddress.getLocalHost().getHostName()", + e); + } + if (nodeHostName == null) { + try { + nodeHostName = InetAddress.getLocalHost() + .getCanonicalHostName(); + } catch (Throwable e) { + logger.warn( + "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", + e); + } + } + } + if (nodeHostName == null) { + isMetricsEnabled = false; + logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics"); + } else { + isMetricsEnabled = true; + logger.info("LogFeeder Metrics is enabled. Metrics host=" + + amsClient.getCollectorUri()); + } + } else { + logger.info("LogFeeder Metrics publish is disabled"); + } + } + + /** + * @return + */ + public boolean isMetricsEnabled() { + return isMetricsEnabled; + } + + /** + * @param metricsList + */ + synchronized public void useMetrics(List<MetricCount> metricsList) { + if (!isMetricsEnabled) { + return; + } + logger.info("useMetrics() metrics.size=" + metricsList.size()); + long currMS = System.currentTimeMillis(); + Long currMSLong = new Long(currMS); + for (MetricCount metric : metricsList) { + if (metric.metricsName == null) { + logger.debug("metric.metricsName is null"); + // Metrics is not meant to be published + continue; + } + long currCount = metric.count; + if (!metric.isPointInTime && metric.publishCount > 0 + && currCount <= metric.prevPublishCount) { + // No new data added, so let's ignore it + logger.debug("Nothing changed. " + metric.metricsName + + ", currCount=" + currCount + ", prevPublishCount=" + + metric.prevPublishCount); + continue; + } + metric.publishCount++; + + TimelineMetric timelineMetric = metricsMap.get(metric.metricsName); + if (timelineMetric == null) { + logger.debug("Creating new metric obbject for " + + metric.metricsName); + // First time for this metric + timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metric.metricsName); + timelineMetric.setHostName(nodeHostName); + timelineMetric.setAppId(appId); + timelineMetric.setStartTime(currMS); + timelineMetric.setType("Long"); + timelineMetric.setMetricValues(new TreeMap<Long, Double>()); + + metricsMap.put(metric.metricsName, timelineMetric); + } + logger.debug("Adding metrics=" + metric.metricsName); + if (metric.isPointInTime) { + timelineMetric.getMetricValues().put(currMSLong, + new Double(currCount)); + } else { + Double value = timelineMetric.getMetricValues().get(currMSLong); + if (value == null) { + value = new Double(0); + } + value += (currCount - metric.prevPublishCount); + timelineMetric.getMetricValues().put(currMSLong, value); + metric.prevPublishCount = currCount; + metric.prevPublishMS = currMS; + } + } + + if (metricsMap.size() > 0 + && currMS - lastPublishTimeMS > publishIntervalMS) { + try { + // Time to publish + TimelineMetrics timelineMetrics = new TimelineMetrics(); + List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>(); + timeLineMetricList.addAll(metricsMap.values()); + timelineMetrics.setMetrics(timeLineMetricList); + amsClient.emitMetrics(timelineMetrics); + logger.info("Published " + timeLineMetricList.size() + + " metrics to AMS"); + metricsMap.clear(); + timeLineMetricList.clear(); + lastPublishTimeMS = currMS; + } catch (Throwable t) { + logger.warn("Error sending metrics to AMS.", t); + if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) { + logger.error("AMS was not sent for last " + + maxMetricsBuffer + / 1000 + + " seconds. Purging it and will start rebuilding it again"); + metricsMap.clear(); + lastFailedPublishTimeMS = currMS; + } + } + } else { + logger.info("Not publishing metrics. metrics.size()=" + + metricsMap.size() + ", lastPublished=" + + (currMS - lastPublishTimeMS) / 1000 + + " seconds ago, intervalConfigured=" + publishIntervalMS + / 1000); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java new file mode 100644 index 0000000..2a54f28 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder; + +import com.google.common.primitives.Ints; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * This is a very fast, non-cryptographic hash suitable for general hash-based + * lookup. See http://murmurhash.googlepages.com/ for more details. + * <p/> + * <p>The C version of MurmurHash 2.0 found at that site was ported + * to Java by Andrzej Bialecki (ab at getopt org).</p> + */ +public final class MurmurHash { + + private MurmurHash() { + } + + /** + * Hashes an int. + * + * @param data The int to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(int data, int seed) { + return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed); + } + + /** + * Hashes bytes in an array. + * + * @param data The bytes to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(byte[] data, int seed) { + return hash(ByteBuffer.wrap(data), seed); + } + + /** + * Hashes bytes in part of an array. + * + * @param data The data to hash. + * @param offset Where to start munging. + * @param length How many bytes to process. + * @param seed The seed to start with. + * @return The 32-bit hash of the data in question. + */ + public static int hash(byte[] data, int offset, int length, int seed) { + return hash(ByteBuffer.wrap(data, offset, length), seed); + } + + /** + * Hashes the bytes in a buffer from the current position to the limit. + * + * @param buf The bytes to hash. + * @param seed The seed for the hash. + * @return The 32 bit murmur hash of the bytes in the buffer. + */ + public static int hash(ByteBuffer buf, int seed) { + // save byte order for later restoration + ByteOrder byteOrder = buf.order(); + buf.order(ByteOrder.LITTLE_ENDIAN); + + int m = 0x5bd1e995; + int r = 24; + + int h = seed ^ buf.remaining(); + + while (buf.remaining() >= 4) { + int k = buf.getInt(); + + k *= m; + k ^= k >>> r; + k *= m; + + h *= m; + h ^= k; + } + + if (buf.remaining() > 0) { + ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); + // for big-endian version, use this first: + // finish.position(4-buf.remaining()); + finish.put(buf).rewind(); + h ^= finish.getInt(); + h *= m; + } + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + buf.order(byteOrder); + return h; + } + + + public static long hash64A(byte[] data, int seed) { + return hash64A(ByteBuffer.wrap(data), seed); + } + + public static long hash64A(byte[] data, int offset, int length, int seed) { + return hash64A(ByteBuffer.wrap(data, offset, length), seed); + } + + public static long hash64A(ByteBuffer buf, int seed) { + ByteOrder byteOrder = buf.order(); + buf.order(ByteOrder.LITTLE_ENDIAN); + + long m = 0xc6a4a7935bd1e995L; + int r = 47; + + long h = seed ^ (buf.remaining() * m); + + while (buf.remaining() >= 8) { + long k = buf.getLong(); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + if (buf.remaining() > 0) { + ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); + // for big-endian version, do this first: + // finish.position(8-buf.remaining()); + finish.put(buf).rewind(); + h ^= finish.getLong(); + h *= m; + } + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + buf.order(byteOrder); + return h; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java new file mode 100644 index 0000000..5b70bca --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class OutputMgr { + static Logger logger = Logger.getLogger(OutputMgr.class); + + Collection<Output> outputList = new ArrayList<Output>(); + + String hostName = null; + String ipAddress = null; + boolean addMessageMD5 = true; + + private int MAX_OUTPUT_SIZE = 32765; // 32766-1 + static long doc_counter = 0; + public MetricCount messageTruncateMetric = new MetricCount(); + + public OutputMgr() { + // Set the host for this server + try { + InetAddress ip = InetAddress.getLocalHost(); + ipAddress = ip.getHostAddress(); + hostName = ip.getHostName(); + } catch (UnknownHostException e) { + logger.error("Error getting hostname.", e); + } + } + + public Collection<Output> getOutputList() { + return outputList; + } + + public void setOutputList(Collection<Output> outputList) { + this.outputList = outputList; + } + + /** + * @param jsonObj + * @param inputStr + * @param input + */ + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { + Input input = inputMarker.input; + + // Update the block with the context fields + for (Map.Entry<String, String> entry : input.getContextFields() + .entrySet()) { + if (jsonObj.get(entry.getKey()) == null) { + jsonObj.put(entry.getKey(), entry.getValue()); + } + } + + // TODO: Ideally most of the overrides should be configurable + + // Add the input type + if (jsonObj.get("type") == null) { + jsonObj.put("type", input.getStringValue("type")); + } + if (jsonObj.get("path") == null && input.getFilePath() != null) { + jsonObj.put("path", input.getFilePath()); + } + if (jsonObj.get("path") == null && input.getStringValue("path") != null) { + jsonObj.put("path", input.getStringValue("path")); + } + + // Add host if required + if (jsonObj.get("host") == null && hostName != null) { + jsonObj.put("host", hostName); + } + // Add IP if required + if (jsonObj.get("ip") == null && ipAddress != null) { + jsonObj.put("ip", ipAddress); + } + + if (input.isUseEventMD5() || input.isGenEventMD5()) { + String prefix = ""; + Object logtimeObj = jsonObj.get("logtime"); + if (logtimeObj != null) { + if (logtimeObj instanceof Date) { + prefix = "" + ((Date) logtimeObj).getTime(); + } else { + prefix = logtimeObj.toString(); + } + } + Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson() + .toJson(jsonObj)); + if (input.isGenEventMD5()) { + jsonObj.put("event_md5", prefix + eventMD5.toString()); + } + if (input.isUseEventMD5()) { + jsonObj.put("id", prefix + eventMD5.toString()); + } + } + + // jsonObj.put("@timestamp", new Date()); + jsonObj.put("seq_num", new Long(doc_counter++)); + if (jsonObj.get("id") == null) { + jsonObj.put("id", UUID.randomUUID().toString()); + } + if (jsonObj.get("event_count") == null) { + jsonObj.put("event_count", new Integer(1)); + } + if (inputMarker.lineNumber > 0) { + jsonObj.put("logfile_line_number", new Integer( + inputMarker.lineNumber)); + } + if (jsonObj.containsKey("log_message")) { + // TODO: Let's check size only for log_message for now + String logMessage = (String) jsonObj.get("log_message"); + if (logMessage != null + && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { + messageTruncateMetric.count++; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_MESSAGESIZE"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Message is too big. size=" + + logMessage.getBytes().length + ", input=" + + input.getShortDescription() + + ". Truncating to " + MAX_OUTPUT_SIZE + + ", first upto 100 characters=" + + LogFeederUtil.subString(logMessage, 100), + null, logger, Level.WARN); + logMessage = new String(logMessage.getBytes(), 0, + MAX_OUTPUT_SIZE); + jsonObj.put("log_message", logMessage); + // Add error tags + @SuppressWarnings("unchecked") + List<String> tagsList = (List<String>) jsonObj.get("tags"); + if (tagsList == null) { + tagsList = new ArrayList<String>(); + jsonObj.put("tags", tagsList); + } + tagsList.add("error_message_truncated"); + + } + if (addMessageMD5) { + jsonObj.put("message_md5", + "" + LogFeederUtil.genHash(logMessage)); + } + } + //check log is allowed to send output + if (FilterLogData.INSTANCE.isAllowed(jsonObj)) { + for (Output output : input.getOutputList()) { + try { + output.write(jsonObj, inputMarker); + } catch (Exception e) { + logger.error("Error writing. to " + output.getShortDescription(), e); + } + } + } + } + + public void write(String jsonBlock, InputMarker inputMarker) { + //check log is allowed to send output + if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) { + for (Output output : inputMarker.input.getOutputList()) { + try { + output.write(jsonBlock, inputMarker); + } catch (Exception e) { + logger.error("Error writing. to " + output.getShortDescription(), e); + } + } + } + } + + /** + * Close all the outputs + */ + public void close() { + logger.info("Close called for outputs ..."); + for (Output output : outputList) { + try { + output.setDrain(true); + output.close(); + } catch (Exception e) { + // Ignore + } + } + // Need to get this value from property + int iterations = 30; + int waitTimeMS = 1000; + int i; + boolean allClosed = true; + for (i = 0; i < iterations; i++) { + allClosed = true; + for (Output output : outputList) { + if (!output.isClosed()) { + try { + allClosed = false; + logger.warn("Waiting for output to close. " + + output.getShortDescription() + ", " + + (iterations - i) + " more seconds"); + Thread.sleep(waitTimeMS); + } catch (Throwable t) { + // Ignore + } + } + } + if (allClosed) { + break; + } + } + + if (!allClosed) { + logger.warn("Some outpus were not closed. Iterations=" + i); + for (Output output : outputList) { + if (!output.isClosed()) { + logger.warn("Output not closed. Will ignore it." + + output.getShortDescription() + ", pendingCound=" + + output.getPendingCount()); + } + } + } else { + logger.info("All outputs are closed. Iterations=" + i); + } + } + + /** + * + */ + public void logStats() { + for (Output output : outputList) { + output.logStat(); + } + LogFeederUtil.logStatForMetric(messageTruncateMetric, + "Stat: Messages Truncated", null); + } + + /** + * @param metricsList + */ + public void addMetricsContainers(List<MetricCount> metricsList) { + metricsList.add(messageTruncateMetric); + for (Output output : outputList) { + output.addMetricsContainers(metricsList); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java new file mode 100644 index 0000000..aa1edea --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.filter; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logfeeder.AliasUtil; +import org.apache.ambari.logfeeder.ConfigBlock; +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.MetricCount; +import org.apache.ambari.logfeeder.OutputMgr; +import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM; +import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.mapper.Mapper; +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; + +public abstract class Filter extends ConfigBlock { + static private Logger logger = Logger.getLogger(Filter.class); + + OutputMgr outputMgr; + Input input; + Filter nextFilter = null; + + Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>(); + + @Override + public void init() throws Exception { + super.init(); + + initializePostMapValues(); + if (nextFilter != null) { + nextFilter.init(); + } + } + + /** + * + */ + @SuppressWarnings("unchecked") + protected void initializePostMapValues() { + // Initialize map values + Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values"); + if (postMapValues == null) { + return; + } + for (String fieldName : postMapValues.keySet()) { + List<Map<String, Object>> mapList = null; + Object values = postMapValues.get(fieldName); + if (values instanceof List<?>) { + mapList = (List<Map<String, Object>>) values; + } else { + mapList = new ArrayList<Map<String, Object>>(); + mapList.add((Map<String, Object>) values); + } + for (Map<String, Object> mapObject : mapList) { + for (String mapClassCode : mapObject.keySet()) { + Mapper mapper = getMapper(mapClassCode); + if (mapper == null) { + break; + } + if (mapper.init(getInput().getShortDescription(), + fieldName, mapClassCode, + mapObject.get(mapClassCode))) { + List<Mapper> fieldMapList = postFieldValueMappers + .get(fieldName); + if (fieldMapList == null) { + fieldMapList = new ArrayList<Mapper>(); + postFieldValueMappers.put(fieldName, fieldMapList); + } + fieldMapList.add(mapper); + } + } + } + } + } + + /** + * @param mapClassCode + * @return + */ + protected Mapper getMapper(String mapClassCode) { + String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS); + if (classFullName != null && !classFullName.isEmpty()) { + Mapper mapper = (Mapper) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.MAPPER); + return mapper; + } + return null; + } + + public void setOutputMgr(OutputMgr outputMgr) { + this.outputMgr = outputMgr; + } + + public Filter getNextFilter() { + return nextFilter; + } + + public void setNextFilter(Filter nextFilter) { + this.nextFilter = nextFilter; + } + + public Input getInput() { + return input; + } + + public void setInput(Input input) { + this.input = input; + } + + /** + * Deriving classes should implement this at the minimum + * + * @param inputStr + * @param marker + */ + public void apply(String inputStr, InputMarker inputMarker) { + // TODO: There is no transformation for string types. + if (nextFilter != null) { + nextFilter.apply(inputStr, inputMarker); + } else { + outputMgr.write(inputStr, inputMarker); + } + } + + public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) { + if (postFieldValueMappers.size() > 0) { + for (String fieldName : postFieldValueMappers.keySet()) { + Object value = jsonObj.get(fieldName); + if (value != null) { + for (Mapper mapper : postFieldValueMappers.get(fieldName)) { + value = mapper.apply(jsonObj, value); + } + } + } + } + if (nextFilter != null) { + nextFilter.apply(jsonObj, inputMarker); + } else { + outputMgr.write(jsonObj, inputMarker); + } + } + + /** + * + */ + public void close() { + if (nextFilter != null) { + nextFilter.close(); + } + } + + public void flush() { + + } + + @Override + public void logStat() { + super.logStat(); + if (nextFilter != null) { + nextFilter.logStat(); + } + } + + @Override + public boolean isFieldConditionMatch(String fieldName, String stringValue) { + if (!super.isFieldConditionMatch(fieldName, stringValue)) { + // Let's try input + if (input != null) { + return input.isFieldConditionMatch(fieldName, stringValue); + } else { + return false; + } + } + return true; + } + + @Override + public String getShortDescription() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean logConfgs(Priority level) { + if (!super.logConfgs(level)) { + return false; + } + logger.log(level, "input=" + input.getShortDescription()); + return true; + } + + @Override + public void addMetricsContainers(List<MetricCount> metricsList) { + super.addMetricsContainers(metricsList); + if (nextFilter != null) { + nextFilter.addMetricsContainers(metricsList); + } + } + +}
