Repository: ambari Updated Branches: refs/heads/branch-2.6 93820ddfd -> a22f66f71
AMBARI-22395. Log Feeder: cleanup checkpoint files periodically (oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a22f66f7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a22f66f7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a22f66f7 Branch: refs/heads/branch-2.6 Commit: a22f66f719fd11efd5ecfec09dc5a8112f9ada1b Parents: 93820dd Author: Oliver Szabo <[email protected]> Authored: Tue Nov 14 13:21:42 2017 +0100 Committer: Oliver Szabo <[email protected]> Committed: Tue Nov 14 18:27:47 2017 +0100 ---------------------------------------------------------------------- .../logfeeder/input/AbstractInputFile.java | 9 +++ .../apache/ambari/logfeeder/input/Input.java | 29 +++++--- .../ambari/logfeeder/input/InputManager.java | 12 ++++ .../input/monitor/AbstractLogFileMonitor.java | 5 +- .../input/monitor/CheckpointCleanupMonitor.java | 48 +++++++++++++ .../input/monitor/LogFileDetachMonitor.java | 2 +- .../input/monitor/LogFilePathUpdateMonitor.java | 2 +- .../logfeeder/metrics/LogFeederAMSClient.java | 3 + .../logfeeder/metrics/MetricsManager.java | 8 ++- .../apache/ambari/logfeeder/util/FileUtil.java | 38 ++++++---- .../logfeeder/metrics/MetrcisManagerTest.java | 9 ++- .../test-config/logfeeder/logfeeder.properties | 3 +- .../shipper-conf/input.config-storm.json | 75 ++++++++++++++++++++ .../streamline-1-TestAgg-2-3/6701/worker.log | 4 ++ 14 files changed, 210 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index 8548a20..64f2cbc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -54,6 +54,7 @@ public abstract class AbstractInputFile extends Input { private int checkPointIntervalMS; private Map<String, Object> jsonCheckPoint; private InputMarker lastCheckPointInputMarker; + private Integer maxAgeMin; @Override protected String getStatMetricName() { @@ -75,6 +76,7 @@ public abstract class AbstractInputFile extends Input { setClosed(true); logPath = getStringValue("path"); tail = getBooleanValue("tail", tail); + maxAgeMin = getIntValue("max_age_min", 0); checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS); if (StringUtils.isEmpty(logPath)) { @@ -286,6 +288,9 @@ public abstract class AbstractInputFile extends Input { jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber)); jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); jsonCheckPoint.put("last_write_time_date", new Date()); + if (maxAgeMin != 0) { + jsonCheckPoint.put("max_age_min", maxAgeMin.toString()); + } String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); @@ -326,4 +331,8 @@ public abstract class AbstractInputFile extends Input { return "input:source=" + getStringValue("source") + ", path=" + (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath); } + + public Integer getMaxAgeMin() { + return maxAgeMin; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 96320e9..7df0b6e 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -34,6 +34,7 @@ import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; +import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; @@ -190,16 +191,20 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable { String folderPath = folderFileEntry.getKey(); String filePath = new File(getFilePath()).getName(); String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath); - clonedObject.setMultiFolder(false); - clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail - clonedObject.logPath = fullPathWithWildCard; - clonedObject.setLogFileDetacherThread(null); - clonedObject.setLogFilePathUpdaterThread(null); - clonedObject.setInputChildMap(new HashMap<String, InputFile>()); - Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard); - clonedObject.setThread(thread); - inputChildMap.put(fullPathWithWildCard, clonedObject); - thread.start(); + if (clonedObject.getMaxAgeMin() != 0 && FileUtil.isFileTooOld(new File(fullPathWithWildCard), clonedObject.getMaxAgeMin().longValue())) { + LOG.info(String.format("File ('%s') is too old (max age min: %d), monitor thread not starting...", getFilePath(), clonedObject.getMaxAgeMin())); + } else { + clonedObject.setMultiFolder(false); + clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail + clonedObject.logPath = fullPathWithWildCard; + clonedObject.setLogFileDetacherThread(null); + clonedObject.setLogFilePathUpdaterThread(null); + clonedObject.setInputChildMap(new HashMap<String, InputFile>()); + Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard); + clonedObject.setThread(thread); + inputChildMap.put(fullPathWithWildCard, clonedObject); + thread.start(); + } } public void stopChildInputFileThread(String folderPathKey) { @@ -208,7 +213,9 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable { String fullPathWithWildCard = String.format("%s/%s", folderPathKey, filePath); if (inputChildMap.containsKey(fullPathWithWildCard)) { InputFile inputFile = inputChildMap.get(fullPathWithWildCard); - inputFile.getThread().interrupt(); + if (inputFile.getThread() != null && inputFile.getThread().isAlive()) { + inputFile.getThread().interrupt(); + } inputChildMap.remove(fullPathWithWildCard); } else { LOG.warn(fullPathWithWildCard + " not found as an input child."); http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java index 8e70850..9b472ad 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; @@ -139,6 +140,10 @@ public class InputManager { if (isCheckPointFolderValid) { LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints"); } + // check checkpoint cleanup every 2000 min + Thread checkpointCleanupThread = new Thread(new CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup"); + checkpointCleanupThread.setDaemon(true); + checkpointCleanupThread.start(); } } @@ -269,6 +274,10 @@ public class InputManager { String logFilePath = (String) jsonCheckPoint.get("file_path"); String logFileKey = (String) jsonCheckPoint.get("file_key"); + Integer maxAgeMin = null; + if (jsonCheckPoint.containsKey("max_age_min")) { + maxAgeMin = Integer.parseInt(jsonCheckPoint.get("max_age_min").toString()); + } if (logFilePath != null && logFileKey != null) { boolean deleteCheckPointFile = false; File logFile = new File(logFilePath); @@ -279,6 +288,9 @@ public class InputManager { deleteCheckPointFile = true; LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" + logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath()); + } else if (maxAgeMin != null && maxAgeMin != 0 && FileUtil.isFileTooOld(logFile, maxAgeMin)) { + deleteCheckPointFile = true; + LOG.info("Checkpoint clean: File reached max age minutes (" + maxAgeMin + "):" + logFilePath); } } else { LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" + http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java index 3910b9b..f01b39b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java @@ -19,6 +19,7 @@ package org.apache.ambari.logfeeder.input.monitor; import org.apache.ambari.logfeeder.input.InputFile; +import org.apache.ambari.logfeeder.util.FileUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +61,6 @@ public abstract class AbstractLogFileMonitor implements Runnable { } } - protected boolean isFileTooOld(File monitoredFile, long detachTimeMin) { - return (System.currentTimeMillis() - monitoredFile.lastModified()) > detachTimeMin * 60 * 1000; - } - protected abstract String getStartLog(); protected abstract void monitorAndUpdate() throws Exception; http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java new file mode 100644 index 0000000..b8377c1 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.monitor; + +import org.apache.ambari.logfeeder.input.InputManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CheckpointCleanupMonitor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(CheckpointCleanupMonitor.class); + + private long waitIntervalMin; + private InputManager inputManager; + + public CheckpointCleanupMonitor(InputManager inputManager, long waitIntervalMin) { + this.waitIntervalMin = waitIntervalMin; + this.inputManager = inputManager; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(1000 * 60 * waitIntervalMin); + inputManager.cleanCheckPointFiles(); + } catch (Exception e) { + LOG.error("Cleanup checkpoint files thread interrupted.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java index 322a56d..0eb8ce8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java @@ -58,7 +58,7 @@ public class LogFileDetachMonitor extends AbstractLogFileMonitor { for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) { if (inputFileEntry.getKey().startsWith(entry.getKey())) { File monitoredFile = entry.getValue().get(0); - boolean isFileTooOld = isFileTooOld(monitoredFile, getDetachTime()); + boolean isFileTooOld = FileUtil.isFileTooOld(monitoredFile, getDetachTime()); if (isFileTooOld) { LOG.info("File ('{}') in folder ('{}') is too old (reached {} minutes), detach input thread.", entry.getKey(), getDetachTime()); getInputFile().stopChildInputFileThread(entry.getKey()); http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java index cc5d664..fcc9618 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java @@ -63,7 +63,7 @@ public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor { } else { LOG.info("New log file folder found: {}, start a new thread if tail file is not too old.", entry.getKey()); File monitoredFile = entry.getValue().get(0); - if (isFileTooOld(monitoredFile, getDetachTime())) { + if (FileUtil.isFileTooOld(monitoredFile, getDetachTime())) { LOG.info("'{}' file is too old. No new thread start needed.", monitoredFile.getAbsolutePath()); } else { getInputFile().startNewChildInputFileThread(entry); http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java index 2d1bf40..c0092c7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -64,6 +64,9 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink { @Override public String getCollectorUri(String host) { + if (collectorProtocol == null || host == null || collectorPort == null || collectorPath == null) { + return null; + } return String.format("%s://%s:%s%s", collectorProtocol, host, collectorPort, collectorPath); } http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java index 1432c87..ebf7c6c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java @@ -47,7 +47,9 @@ public class MetricsManager { public void init() { LOG.info("Initializing MetricsManager()"); - amsClient = new LogFeederAMSClient(); + if (amsClient == null) { + amsClient = new LogFeederAMSClient(); + } if (amsClient.getCollectorUri(null) != null) { findNodeHostName(); @@ -165,4 +167,8 @@ public class MetricsManager { (currMS - lastPublishTimeMS) / 1000 + " seconds ago, intervalConfigured=" + publishIntervalMS / 1000); } } + + public void setAmsClient(LogFeederAMSClient amsClient) { + this.amsClient = amsClient; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java index 843ae6b..86f0903 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java @@ -101,22 +101,26 @@ public class FileUtil { return new File[]{searchFile}; } else { if (searchPath.contains("*")) { - String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath); - String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length()); - - DirectoryScanner scanner = new DirectoryScanner(); - scanner.setIncludes(new String[]{fileNameAfterLastFolder}); - scanner.setBasedir(folderBeforeRegex); - scanner.setCaseSensitive(true); - scanner.scan(); - String[] fileNames = scanner.getIncludedFiles(); - - if (fileNames != null && fileNames.length > 0) { - File[] files = new File[fileNames.length]; - for (int i = 0; i < fileNames.length; i++) { - files[i] = new File(folderBeforeRegex + fileNames[i]); + try { + String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath); + String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length()); + + DirectoryScanner scanner = new DirectoryScanner(); + scanner.setIncludes(new String[]{fileNameAfterLastFolder}); + scanner.setBasedir(folderBeforeRegex); + scanner.setCaseSensitive(true); + scanner.scan(); + String[] fileNames = scanner.getIncludedFiles(); + + if (fileNames != null && fileNames.length > 0) { + File[] files = new File[fileNames.length]; + for (int i = 0; i < fileNames.length; i++) { + files[i] = new File(folderBeforeRegex + fileNames[i]); + } + return files; } - return files; + } catch (Exception e) { + LOG.warn("Input file not found by pattern (exception thrown); {}, message: {}", searchPath, e.getMessage()); } } else { @@ -164,4 +168,8 @@ public class FileUtil { return beforeRegex; } } + + public static boolean isFileTooOld(File file, long diffMin) { + return (System.currentTimeMillis() - file.lastModified()) > diffMin * 1000 * 60; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java index 8ee6d00..f8cbb18 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java @@ -50,18 +50,21 @@ public class MetrcisManagerTest { @Before public void init() throws Exception { manager = new MetricsManager(); - manager.init(); mockClient = strictMock(LogFeederAMSClient.class); Field f = MetricsManager.class.getDeclaredField("amsClient"); f.setAccessible(true); f.set(manager, mockClient); - + + EasyMock.expect(mockClient.getCollectorUri(null)).andReturn("null://null:null/null").anyTimes(); capture = EasyMock.newCapture(CaptureType.FIRST); mockClient.emitMetrics(EasyMock.capture(capture)); EasyMock.expectLastCall().andReturn(true).once(); - + replay(mockClient); + manager.setAmsClient(mockClient); + + manager.init(); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties index 068bc3a..0a6c7a8 100644 --- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties +++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties @@ -23,7 +23,8 @@ logfeeder.config.files=shipper-conf/global.config.json,\ shipper-conf/input.config-system_message.json,\ shipper-conf/input.config-secure_log.json,\ shipper-conf/input.config-hdfs.json,\ - shipper-conf/input.config-ambari.json + shipper-conf/input.config-ambari.json,\ + shipper-conf/input.config-storm.json logfeeder.log.filter.enable=true logfeeder.solr.config.interval=5 logfeeder.solr.core.config.name=history http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json new file mode 100644 index 0000000..34bdcf0 --- /dev/null +++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json @@ -0,0 +1,75 @@ +{ + "input":[ + { + "type":"storm_worker", + "rowtype":"service", + "path":"/root/test-logs/storm/worker-logs/*/*/worker.log", + "init_default_fields": "true" + } + ], + "filter":[ + { + "filter":"grok", + "sort_order": 1, + "conditions":{ + "fields":{ + "type":[ + "storm_worker" + ] + } + }, + "log4j_format":"", + "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})", + "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}\\s%{JAVACLASS:logger_name}\\s%{DATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}", + "post_map_values":{ + "logtime":{ + "map_date":{ + "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS" + } + } + } + }, + { + "filter":"grok", + "sort_order": 2, + "conditions":{ + "fields":{ + "type":[ + "storm_worker" + ] + } + }, + "source_field": "thread_name", + "remove_source_field": "false", + "message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:storm_component_name}\\-executor%{DATA}|%{DATA})" + }, + { + "filter":"grok", + "sort_order": 3, + "conditions":{ + "fields":{ + "type":[ + "storm_worker" + ] + } + }, + "source_field": "path", + "remove_source_field": "false", + "message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:storm_topology_id}/%{DATA:storm_worker_port}/worker\\.log" + }, + { + "filter":"grok", + "sort_order": 4, + "conditions":{ + "fields":{ + "type":[ + "storm_worker" + ] + } + }, + "source_field": "storm_topology_id", + "remove_source_field": "false", + "message_pattern":"(streamline\\-%{DATA:streamline_topology_id}\\-%{DATA:streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{GREEDYDATA})" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log new file mode 100644 index 0000000..6a10ad9 --- /dev/null +++ b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log @@ -0,0 +1,4 @@ +2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Preparing bolt __acker:(5) +2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Prepared bolt __acker:(5) +2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing transport +2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 3-NOTIFICATION:[3 3] \ No newline at end of file
