Repository: ambari Updated Branches: refs/heads/AMBARI-21145 1a3fcd234 -> a38dd28d4 (forced update)
AMBARI-21145. Allow wildcard for log directory folder in the path component of Logfeeder input (oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/178c8a1f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/178c8a1f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/178c8a1f Branch: refs/heads/AMBARI-21145 Commit: 178c8a1f5414a9e4e11627ad9846ed2b7d5c8c22 Parents: 268c964 Author: Oliver Szabo <[email protected]> Authored: Mon Oct 9 21:17:38 2017 +0200 Committer: Oliver Szabo <[email protected]> Committed: Wed Oct 18 20:35:13 2017 +0200 ---------------------------------------------------------------------- .../ambari-logsearch-logfeeder/pom.xml | 5 + .../ambari/logfeeder/filter/FilterGrok.java | 16 ++- .../logfeeder/input/AbstractInputFile.java | 10 ++ .../apache/ambari/logfeeder/input/Input.java | 118 ++++++++++++++++++- .../ambari/logfeeder/input/InputFile.java | 22 ++-- .../input/monitor/AbstractLogFileMonitor.java | 70 +++++++++++ .../input/monitor/LogFileDetachMonitor.java | 79 +++++++++++++ .../input/monitor/LogFilePathUpdateMonitor.java | 74 ++++++++++++ .../logfeeder/logconfig/LogConfigHandler.java | 2 +- .../apache/ambari/logfeeder/util/FileUtil.java | 83 ++++++++++++- ambari-logsearch/docker/Dockerfile | 6 +- ambari-logsearch/docker/logsearch-docker.sh | 4 +- 12 files changed, 460 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml index 49122e8..3a0524d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml @@ -154,6 +154,11 @@ <artifactId>commons-io</artifactId> <version>${common.io.version}</version> </dependency> + <dependency> + <groupId>org.apache.ant</groupId> + <artifactId>ant</artifactId> + <version>1.7.1</version> + </dependency> </dependencies> <build> <finalName>LogFeeder</finalName> http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 7e2da70..deff1b2 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -87,11 +87,15 @@ public class FilterGrok extends Filter { LOG.error("message_pattern is not set for filter."); return; } - extractNamedParams(messagePattern, namedParamList); grokMessage = new Grok(); loadPatterns(grokMessage); grokMessage.compile(messagePattern); + if (getBooleanValue("deep_extract", false)) { + extractNamedParams(grokMessage.getNamedRegexCollection()); + } else { + extractNamedParams(messagePattern, namedParamList); + } if (!StringUtils.isEmpty(multilinePattern)) { extractNamedParams(multilinePattern, multiLineamedParamList); @@ -108,6 +112,16 @@ public class FilterGrok extends Filter { } + private void extractNamedParams(Map<String, String> namedRegexCollection) { + if (namedRegexCollection != null) { + for (String paramValue : namedRegexCollection.values()) { + if (paramValue.toLowerCase().equals(paramValue)) { + namedParamList.add(paramValue); + } + } + } + } + private String escapePattern(String inPattern) { String inStr = inPattern; if (inStr != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index 41a1fa5..8548a20 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -89,6 +89,16 @@ public abstract class AbstractInputFile extends Input { } setFilePath(logPath); + // Check there can have pattern in folder + if (getFilePath() != null && getFilePath().contains("/")) { + int lastIndexOfSlash = getFilePath().lastIndexOf("/"); + String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash); + if (folderBeforeLogName.contains("*")) { + LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders."); + setMultiFolder(true); + } + } + boolean isFileReady = isReady(); LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady); http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 9f54d8a..6f6bb6d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -29,13 +29,15 @@ import org.apache.ambari.logfeeder.input.cache.LRUCache; import org.apache.ambari.logfeeder.common.ConfigBlock; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor; +import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; -public abstract class Input extends ConfigBlock implements Runnable { +public abstract class Input extends ConfigBlock implements Runnable, Cloneable { private static final Logger LOG = Logger.getLogger(Input.class); private static final boolean DEFAULT_TAIL = true; @@ -46,6 +48,9 @@ public abstract class Input extends ConfigBlock implements Runnable { private static final int DEFAULT_CACHE_SIZE = 100; private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; + private static final int DEFAULT_DETACH_INTERVAL_MIN = 300; + private static final int DEFAULT_DETACH_TIME_MIN = 2000; + private static final int DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN = 5; private static final String CACHE_ENABLED = "cache_enabled"; private static final String CACHE_KEY_FIELD = "cache_key_field"; @@ -58,6 +63,12 @@ public abstract class Input extends ConfigBlock implements Runnable { private List<Output> outputList = new ArrayList<Output>(); private Thread thread; + private Thread logFileDetacherThread; + private Thread logFilePathUpdaterThread; + private ThreadGroup threadGroup; + private int detachIntervalMin; + private int pathUpdateIntervalMin; + private int detachTimeMin; private String type; protected String filePath; private Filter firstFilter; @@ -69,6 +80,9 @@ public abstract class Input extends ConfigBlock implements Runnable { private LRUCache cache; private String cacheKeyField; + private boolean multiFolder = false; + private Map<String, List<File>> folderMap; + private Map<String, InputFile> inputChildMap = new HashMap<>(); // TODO: weird it has this relationship protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false); protected String getReadBytesMetricName() { @@ -79,6 +93,9 @@ public abstract class Input extends ConfigBlock implements Runnable { public void loadConfig(Map<String, Object> map) { super.loadConfig(map); String typeValue = getStringValue("type"); + detachIntervalMin = getIntValue("detach_interval_min", DEFAULT_DETACH_INTERVAL_MIN * 60); + detachTimeMin = getIntValue("detach_time_min", DEFAULT_DETACH_TIME_MIN * 60); + pathUpdateIntervalMin = getIntValue("path_update_interval_min", DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60); if (typeValue != null) { // Explicitly add type and value to field list contextFields.put("type", typeValue); @@ -131,20 +148,70 @@ public abstract class Input extends ConfigBlock implements Runnable { if (firstFilter != null) { firstFilter.init(); } - } boolean monitor() { if (isReady()) { - LOG.info("Starting thread. " + getShortDescription()); - thread = new Thread(this, getNameForThread()); - thread.start(); + if (multiFolder) { + try { + threadGroup = new ThreadGroup(getNameForThread()); + if (getFolderMap() != null) { + for (Map.Entry<String, List<File>> folderFileEntry : getFolderMap().entrySet()) { + startNewChildInputFileThread(folderFileEntry); + } + logFilePathUpdaterThread = new Thread(new LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "logfile_path_updater=" + filePath); + logFilePathUpdaterThread.setDaemon(true); + logFileDetacherThread = new Thread(new LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), "logfile_detacher=" + filePath); + logFileDetacherThread.setDaemon(true); + + logFilePathUpdaterThread.start(); + logFileDetacherThread.start(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + LOG.info("Starting thread. " + getShortDescription()); + thread = new Thread(this, getNameForThread()); + thread.start(); + } return true; } else { return false; } } + public void startNewChildInputFileThread(Map.Entry<String, List<File>> folderFileEntry) throws CloneNotSupportedException { + LOG.info("Start child input thread - " + folderFileEntry.getKey()); + InputFile clonedObject = (InputFile) this.clone(); + String folderPath = folderFileEntry.getKey(); + String filePath = new File(getFilePath()).getName(); + String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath); + clonedObject.setMultiFolder(false); + clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail + clonedObject.logPath = fullPathWithWildCard; + clonedObject.setLogFileDetacherThread(null); + clonedObject.setLogFilePathUpdaterThread(null); + clonedObject.setInputChildMap(new HashMap<String, InputFile>()); + Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard); + clonedObject.setThread(thread); + inputChildMap.put(fullPathWithWildCard, clonedObject); + thread.start(); + } + + public void stopChildInputFileThread(String folderPathKey) { + LOG.info("Stop child input thread - " + folderPathKey); + String filePath = new File(getFilePath()).getName(); + String fullPathWithWildCard = String.format("%s/%s", folderPathKey, filePath); + if (inputChildMap.containsKey(fullPathWithWildCard)) { + InputFile inputFile = inputChildMap.get(fullPathWithWildCard); + inputFile.getThread().interrupt(); + inputChildMap.remove(fullPathWithWildCard); + } else { + LOG.warn(fullPathWithWildCard + " not found as an input child."); + } + } + public abstract boolean isReady(); @Override @@ -192,7 +259,14 @@ public abstract class Input extends ConfigBlock implements Runnable { LOG.info("Request to drain. " + getShortDescription()); super.setDrain(drain); try { - thread.interrupt(); + if (multiFolder) { + logFileDetacherThread.interrupt(); + logFilePathUpdaterThread.interrupt(); + threadGroup.interrupt(); + } + if (thread != null) { + thread.interrupt(); + } } catch (Throwable t) { // ignore } @@ -318,6 +392,14 @@ public abstract class Input extends ConfigBlock implements Runnable { this.cacheKeyField = cacheKeyField; } + public Map<String, List<File>> getFolderMap() { + return folderMap; + } + + public void setFolderMap(Map<String, List<File>> folderMap) { + this.folderMap = folderMap; + } + @Override public String getNameForThread() { if (filePath != null) { @@ -334,4 +416,28 @@ public abstract class Input extends ConfigBlock implements Runnable { public String toString() { return getShortDescription(); } + + public void setMultiFolder(boolean multiFolder) { + this.multiFolder = multiFolder; + } + + public void setLogFileDetacherThread(Thread logFileDetacherThread) { + this.logFileDetacherThread = logFileDetacherThread; + } + + public void setLogFilePathUpdaterThread(Thread logFilePathUpdaterThread) { + this.logFilePathUpdaterThread = logFilePathUpdaterThread; + } + + public Map<String, InputFile> getInputChildMap() { + return inputChildMap; + } + + public void setInputChildMap(Map<String, InputFile> inputChildMap) { + this.inputChildMap = inputChildMap; + } + + public void setThread(Thread thread) { + this.thread = thread; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index 3737839..fe7a279 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -20,12 +20,12 @@ package org.apache.ambari.logfeeder.input; import java.io.BufferedReader; import java.io.File; -import java.io.FileFilter; import java.io.FileNotFoundException; +import java.util.List; +import java.util.Map; import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; import org.apache.ambari.logfeeder.util.FileUtil; -import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang3.ArrayUtils; import org.apache.solr.common.util.Base64; @@ -35,7 +35,9 @@ public class InputFile extends AbstractInputFile { public boolean isReady() { if (!isReady) { // Let's try to check whether the file is available - logFiles = getActualFiles(logPath); + logFiles = getActualInputLogFiles(); + Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles); + setFolderMap(foldersMap); if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) { if (tail && logFiles.length > 1) { LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath + @@ -50,16 +52,6 @@ public class InputFile extends AbstractInputFile { return isReady; } - private File[] getActualFiles(String searchPath) { - File searchFile = new File(searchPath); - if (searchFile.isFile()) { - return new File[]{searchFile}; - } else { - FileFilter fileFilter = new WildcardFileFilter(searchFile.getName()); - return searchFile.getParentFile().listFiles(fileFilter); - } - } - @Override void start() throws Exception { boolean isProcessFile = getBooleanValue("process_file", true); @@ -99,6 +91,10 @@ public class InputFile extends AbstractInputFile { return FileUtil.getFileKey(logFile); } + public File[] getActualInputLogFiles() { + return FileUtil.getInputFilesByPattern(logPath); + } + private void copyFiles(File[] files) { boolean isCopyFile = getBooleanValue("copy_file", false); if (isCopyFile && files != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java new file mode 100644 index 0000000..3910b9b --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.monitor; + +import org.apache.ambari.logfeeder.input.InputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public abstract class AbstractLogFileMonitor implements Runnable { + + private Logger LOG = LoggerFactory.getLogger(AbstractLogFileMonitor.class); + + private final InputFile inputFile; + private final int waitInterval; + private final int detachTime; + + AbstractLogFileMonitor(InputFile inputFile, int waitInterval, int detachTime) { + this.inputFile = inputFile; + this.waitInterval = waitInterval; + this.detachTime = detachTime; + } + + public InputFile getInputFile() { + return inputFile; + } + + public int getDetachTime() { + return detachTime; + } + + @Override + public void run() { + LOG.info(getStartLog()); + + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(1000 * waitInterval); + monitorAndUpdate(); + } catch (Exception e) { + LOG.error("Monitor thread interrupted.", e); + } + } + } + + protected boolean isFileTooOld(File monitoredFile, long detachTimeMin) { + return (System.currentTimeMillis() - monitoredFile.lastModified()) > detachTimeMin * 60 * 1000; + } + + protected abstract String getStartLog(); + + protected abstract void monitorAndUpdate() throws Exception; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java new file mode 100644 index 0000000..322a56d --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.monitor; + +import org.apache.ambari.logfeeder.input.InputFile; +import org.apache.ambari.logfeeder.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Detach log files in case of folders do not exist or monitored files are too old + */ +public class LogFileDetachMonitor extends AbstractLogFileMonitor { + + private Logger LOG = LoggerFactory.getLogger(LogFileDetachMonitor.class); + + public LogFileDetachMonitor(InputFile inputFile, int interval, int detachTime) { + super(inputFile, interval, detachTime); + } + + @Override + public String getStartLog() { + return "Start file detach monitor thread for " + getInputFile().getFilePath(); + } + + @Override + protected void monitorAndUpdate() throws Exception { + File[] logFiles = getInputFile().getActualInputLogFiles(); + Map<String, List<File>> actualFolderMap = FileUtil.getFoldersForFiles(logFiles); + + // create map copies + Map<String, InputFile> copiedInputFileMap = new HashMap<>(getInputFile().getInputChildMap()); + Map<String, List<File>> copiedFolderMap = new HashMap<>(getInputFile().getFolderMap()); + // detach old entries + for (Map.Entry<String, List<File>> entry : copiedFolderMap.entrySet()) { + if (new File(entry.getKey()).exists()) { + for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) { + if (inputFileEntry.getKey().startsWith(entry.getKey())) { + File monitoredFile = entry.getValue().get(0); + boolean isFileTooOld = isFileTooOld(monitoredFile, getDetachTime()); + if (isFileTooOld) { + LOG.info("File ('{}') in folder ('{}') is too old (reached {} minutes), detach input thread.", entry.getKey(), getDetachTime()); + getInputFile().stopChildInputFileThread(entry.getKey()); + } + } + } + } else { + LOG.info("Folder not exists. ({}) Stop thread.", entry.getKey()); + for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) { + if (inputFileEntry.getKey().startsWith(entry.getKey())) { + getInputFile().stopChildInputFileThread(entry.getKey()); + getInputFile().setFolderMap(actualFolderMap); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java new file mode 100644 index 0000000..cc5d664 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.monitor; + +import org.apache.ambari.logfeeder.input.InputFile; +import org.apache.ambari.logfeeder.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * Update log file paths periodically, useful if the log file name pattern format is like: mylog-2017-10-09.log (so the tail one can change) + */ +public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor { + + private Logger LOG = LoggerFactory.getLogger(LogFilePathUpdateMonitor.class); + + public LogFilePathUpdateMonitor(InputFile inputFile, int interval, int detachTime) { + super(inputFile, interval, detachTime); + } + + @Override + public String getStartLog() { + return "Start file path update monitor thread for " + getInputFile().getFilePath(); + } + + @Override + protected void monitorAndUpdate() throws Exception { + File[] logFiles = getInputFile().getActualInputLogFiles(); + Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles); + Map<String, List<File>> originalFoldersMap = getInputFile().getFolderMap(); + for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) { + if (originalFoldersMap.keySet().contains(entry.getKey())) { + List<File> originalLogFiles = originalFoldersMap.get(entry.getKey()); + if (!entry.getValue().isEmpty()) { // check tail only for now + File lastFile = entry.getValue().get(0); + if (!originalLogFiles.get(0).getAbsolutePath().equals(lastFile.getAbsolutePath())) { + LOG.info("New file found (old: '{}', new: {}), reload thread for {}", + lastFile.getAbsolutePath(), originalLogFiles.get(0).getAbsolutePath(), entry.getKey()); + getInputFile().stopChildInputFileThread(entry.getKey()); + getInputFile().startNewChildInputFileThread(entry); + } + } + } else { + LOG.info("New log file folder found: {}, start a new thread if tail file is not too old.", entry.getKey()); + File monitoredFile = entry.getValue().get(0); + if (isFileTooOld(monitoredFile, getDetachTime())) { + LOG.info("'{}' file is too old. No new thread start needed.", monitoredFile.getAbsolutePath()); + } else { + getInputFile().startNewChildInputFileThread(entry); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java index 0ece637..6a052d0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java @@ -200,7 +200,7 @@ public class LogConfigHandler extends Thread { try { Thread.sleep(RETRY_INTERVAL * 1000); } catch (InterruptedException e) { - LOG.error(e); + LOG.warn(e); } LOG.info("Checking if config is available"); http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java index ffd6cec..843ae6b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java @@ -27,17 +27,22 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; -import org.apache.log4j.Logger; +import org.apache.tools.ant.DirectoryScanner; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FileUtil { - private static final Logger LOG = Logger.getLogger(FileUtil.class); - + private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class); + private static final String FOLDER_SEPARATOR = "/"; + private FileUtil() { throw new UnsupportedOperationException(); } @@ -85,8 +90,78 @@ public class FileUtil { HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {}); return jsonmap; } catch (IOException e) { - LOG.error(e, e.getCause()); + LOG.error("{}", e); } return new HashMap<String, Object>(); } + + public static File[] getInputFilesByPattern(String searchPath) { + File searchFile = new File(searchPath); + if (searchFile.isFile()) { + return new File[]{searchFile}; + } else { + if (searchPath.contains("*")) { + String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath); + String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length()); + + DirectoryScanner scanner = new DirectoryScanner(); + scanner.setIncludes(new String[]{fileNameAfterLastFolder}); + scanner.setBasedir(folderBeforeRegex); + scanner.setCaseSensitive(true); + scanner.scan(); + String[] fileNames = scanner.getIncludedFiles(); + + if (fileNames != null && fileNames.length > 0) { + File[] files = new File[fileNames.length]; + for (int i = 0; i < fileNames.length; i++) { + files[i] = new File(folderBeforeRegex + fileNames[i]); + } + return files; + } + + } else { + LOG.warn("Input file config not found by pattern; {}", searchPath); + } + return new File[]{}; + } + } + + public static Map<String, List<File>> getFoldersForFiles(File[] inputFiles) { + Map<String, List<File>> foldersMap = new HashMap<>(); + if (inputFiles != null && inputFiles.length > 0) { + for (File inputFile : inputFiles) { + File folder = inputFile.getParentFile(); + if (folder.exists()) { + if (foldersMap.containsKey(folder.getAbsolutePath())) { + foldersMap.get(folder.getAbsolutePath()).add(inputFile); + } else { + List<File> fileList = new ArrayList<>(); + fileList.add(inputFile); + foldersMap.put(folder.getAbsolutePath(), fileList); + } + } + } + } + if (!foldersMap.isEmpty()) { + for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) { + Collections.sort(entry.getValue(), Collections.reverseOrder()); + } + } + return foldersMap; + } + + private static String getLogDirNameBeforeWildCard(String pattern) { + String[] splitByFirstRegex = pattern.split("\\*"); + String beforeRegex = splitByFirstRegex[0]; + if (beforeRegex.contains(FOLDER_SEPARATOR)) { + int endIndex = beforeRegex.lastIndexOf(FOLDER_SEPARATOR); + String parentFolder = beforeRegex; + if (endIndex != -1) { + parentFolder = beforeRegex.substring(0, endIndex) + FOLDER_SEPARATOR; + } + return parentFolder; + } else { + return beforeRegex; + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/Dockerfile b/ambari-logsearch/docker/Dockerfile index dfa1462..e67836b 100644 --- a/ambari-logsearch/docker/Dockerfile +++ b/ambari-logsearch/docker/Dockerfile @@ -22,8 +22,10 @@ RUN yum -y install glibc-common ENV HOME /root #Install JAVA -RUN wget --no-check-certificate --no-cookies --header "Cookie:oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/7u55-b13/jdk-7u55-linux-x64.rpm -O jdk-7u55-linux-x64.rpm -RUN rpm -ivh jdk-7u55-linux-x64.rpm +ENV JAVA_VERSION 8u131 +ENV BUILD_VERSION b11 +RUN wget --no-check-certificate --no-cookies --header "Cookie:oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/$JAVA_VERSION-$BUILD_VERSION/d54c1d3a095b4ff2b6607d096fa80163/jdk-$JAVA_VERSION-linux-x64.rpm -O jdk-8-linux-x64.rpm +RUN rpm -ivh jdk-8-linux-x64.rpm ENV JAVA_HOME /usr/java/default/ #Install Maven http://git-wip-us.apache.org/repos/asf/ambari/blob/178c8a1f/ambari-logsearch/docker/logsearch-docker.sh ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/logsearch-docker.sh b/ambari-logsearch/docker/logsearch-docker.sh index a2df90f..fa54bc1 100755 --- a/ambari-logsearch/docker/logsearch-docker.sh +++ b/ambari-logsearch/docker/logsearch-docker.sh @@ -26,7 +26,7 @@ function build_logsearch_project() { function build_logsearch_container() { pushd $sdir - docker build -t ambari-logsearch:v1.0 . + docker build -t ambari-logsearch:v0.5 . popd } @@ -41,7 +41,7 @@ function start_logsearch_container() { -v $AMBARI_LOCATION:/root/ambari -v $MAVEN_REPOSITORY_LOCATION:/root/.m2 $LOGSEARCH_EXPOSED_PORTS $LOGSEARCH_ENV_OPTS $LOGSEARCH_EXTRA_OPTS $LOGSEARCH_VOLUME_OPTS \ -v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-logfeeder/target/classes:/root/ambari/ambari-logsearch/ambari-logsearch-logfeeder/target/package/classes \ -v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-portal/target/classes:/root/ambari/ambari-logsearch/ambari-logsearch-portal/target/package/classes \ - -v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-portal/src/main/webapp:/root/ambari/ambari-logsearch/ambari-logsearch-portal/target/package/classes/webapps/app ambari-logsearch:v1.0 + -v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-portal/src/main/webapp:/root/ambari/ambari-logsearch/ambari-logsearch-portal/target/package/classes/webapps/app ambari-logsearch:v0.5 ip_address=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' logsearch) echo "Log Search container started on $ip_address (for Mac OSX route to boot2docker/docker-machine VM address, e.g.: 'sudo route add -net 172.17.0.0/16 192.168.59.103')" echo "You can follow Log Search logs with 'docker logs -f logsearch' command"
