http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index 8f8c4fd..e8066be 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -27,60 +27,76 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; +import org.apache.ambari.logfeeder.common.ConfigHandler; +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; -import org.apache.log4j.Logger; import com.google.common.io.Files; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; public class InputConfigUploader extends Thread { - protected static final Logger LOG = Logger.getLogger(InputConfigUploader.class); + + protected static final Logger LOG = LoggerFactory.getLogger(InputConfigUploader.class); private static final long SLEEP_BETWEEN_CHECK = 2000; - private final File configDir; - private final FilenameFilter inputConfigFileFilter = new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("input.config-") && name.endsWith(".json"); - } - }; + private File configDir; + private final FilenameFilter inputConfigFileFilter = (dir, name) -> name.startsWith("input.config-") && name.endsWith(".json"); private final Set<String> filesHandled = new HashSet<>(); private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json"); - private final LogSearchConfigLogFeeder config; - - public static void load(LogSearchConfigLogFeeder config) { - new InputConfigUploader(config).start(); - } - - private InputConfigUploader(LogSearchConfigLogFeeder config) { + + @Inject + private LogSearchConfigLogFeeder config; + + @Inject + private LogFeederProps logFeederProps; + + @Inject + private LogLevelFilterHandler logLevelFilterHandler; + + @Inject + private ConfigHandler configHandler; + + public InputConfigUploader() { super("Input Config Loader"); setDaemon(true); - - this.configDir = new File(LogFeederPropertiesUtil.getConfigDir()); - this.config = config; + } + + @PostConstruct + public void init() throws Exception { + this.configDir = new File(logFeederProps.getConfDir()); + this.start(); + config.monitorInputConfigChanges(configHandler, logLevelFilterHandler, logFeederProps.getClusterName()); } @Override public void run() { while (true) { File[] inputConfigFiles = configDir.listFiles(inputConfigFileFilter); - for (File inputConfigFile : inputConfigFiles) { - if (!filesHandled.contains(inputConfigFile.getAbsolutePath())) { - try { - Matcher m = serviceNamePattern.matcher(inputConfigFile.getName()); - m.find(); - String serviceName = m.group(1); - String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); - - if (!config.inputConfigExists(serviceName)) { - config.createInputConfig(LogFeederPropertiesUtil.getClusterName(), serviceName, inputConfig); + if (inputConfigFiles != null) { + for (File inputConfigFile : inputConfigFiles) { + if (!filesHandled.contains(inputConfigFile.getAbsolutePath())) { + try { + Matcher m = serviceNamePattern.matcher(inputConfigFile.getName()); + m.find(); + String serviceName = m.group(1); + String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); + if (!config.inputConfigExists(serviceName)) { + config.createInputConfig(logFeederProps.getClusterName(), serviceName, inputConfig); + } + filesHandled.add(inputConfigFile.getAbsolutePath()); + } catch (Exception e) { + LOG.warn("Error handling file " + inputConfigFile.getAbsolutePath(), e); } - filesHandled.add(inputConfigFile.getAbsolutePath()); - } catch (Exception e) { - LOG.warn("Error handling file " + inputConfigFile.getAbsolutePath(), e); } } + } else { + LOG.warn("Cannot find input config files in config dir ({})", logFeederProps.getConfDir()); } try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java index f1b422f..d1f38ed 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -33,22 +33,25 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.FileUtil; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.solr.common.util.Base64; +import javax.inject.Inject; + public class InputManager { private static final Logger LOG = Logger.getLogger(InputManager.class); private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints"; private Map<String, List<Input>> inputs = new HashMap<>(); - private Set<Input> notReadyList = new HashSet<Input>(); + private Set<Input> notReadyList = new HashSet<>(); private boolean isDrain = false; @@ -59,6 +62,9 @@ public class InputManager { private Thread inputIsReadyMonitor; + @Inject + private LogFeederProps logFeederProps; + public List<Input> getInputList(String serviceName) { return inputs.get(serviceName); } @@ -118,11 +124,11 @@ public class InputManager { } private void initCheckPointSettings() { - checkPointExtension = LogFeederPropertiesUtil.getCheckPointExtension(); + checkPointExtension = logFeederProps.getCheckPointExtension(); LOG.info("Determining valid checkpoint folder"); boolean isCheckPointFolderValid = false; // We need to keep track of the files we are reading. - String checkPointFolder = LogFeederPropertiesUtil.getCheckpointFolder(); + String checkPointFolder = logFeederProps.getCheckpointFolder(); if (!StringUtils.isEmpty(checkPointFolder)) { checkPointFolderFile = new File(checkPointFolder); isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); @@ -130,8 +136,7 @@ public class InputManager { if (!isCheckPointFolderValid) { // Let's use tmp folder - String tmpFolder = LogFeederPropertiesUtil.getLogFeederTempDir(); - checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME); + checkPointFolderFile = new File(logFeederProps.getTmpDir(), CHECKPOINT_SUBFOLDER_NAME); LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" + checkPointFolderFile); isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); if (isCheckPointFolderValid) { @@ -184,7 +189,7 @@ public class InputManager { public void startInputs(String serviceName) { for (Input input : inputs.get(serviceName)) { try { - input.init(); + input.init(logFeederProps); if (input.isReady()) { input.monitor(); } else { @@ -419,4 +424,13 @@ public class InputManager { } } } + + @VisibleForTesting + public void setLogFeederProps(LogFeederProps logFeederProps) { + this.logFeederProps = logFeederProps; + } + + public LogFeederProps getLogFeederProps() { + return logFeederProps; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index df6c941..2b2d145 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -30,10 +30,11 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ambari.logfeeder.conf.InputSimulateConfig; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.filter.FilterJSON; import org.apache.ambari.logfeeder.output.Output; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl; @@ -65,32 +66,37 @@ public class InputSimulate extends Input { } private final Random random = new Random(System.currentTimeMillis()); - - private final List<String> types; - private final String level; - private final int numberOfWords; - private final int minLogWords; - private final int maxLogWords; - private final long sleepMillis; - private final String host; - - public InputSimulate() throws Exception { + + private InputSimulateConfig conf; + private List<String> types; + private String level; + private int numberOfWords; + private int minLogWords; + private int maxLogWords; + private long sleepMillis; + private String host; + + @Override + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); + conf = logFeederProps.getInputSimulateConfig(); this.types = getSimulatedLogTypes(); - this.level = LogFeederPropertiesUtil.getSimulateLogLevel(); - this.numberOfWords = LogFeederPropertiesUtil.getSimulateNumberOfWords(); - this.minLogWords = LogFeederPropertiesUtil.getSimulateMinLogWords(); - this.maxLogWords = LogFeederPropertiesUtil.getSimulateMaxLogWords(); - this.sleepMillis = LogFeederPropertiesUtil.getSimulateSleepMilliseconds(); + this.level = conf.getSimulateLogLevel(); + this.numberOfWords = conf.getSimulateNumberOfWords(); + this.minLogWords = conf.getSimulateMinLogWords(); + this.maxLogWords = conf.getSimulateMaxLogWords(); + this.sleepMillis = conf.getSimulateSleepMilliseconds(); this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName; - + Filter filter = new FilterJSON(); filter.loadConfig(new FilterJsonDescriptorImpl()); filter.setInput(this); addFilter(filter); + } - + private List<String> getSimulatedLogTypes() { - String logsToSimulate = LogFeederPropertiesUtil.getSimulateLogIds(); + String logsToSimulate = conf.getSimulateLogIds(); return (logsToSimulate == null) ? inputTypes : Arrays.asList(logsToSimulate.split(",")); http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java deleted file mode 100644 index 6173f53..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.loglevelfilter; - -import java.util.Map; - -import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; - -public enum FilterLogData { - INSTANCE; - - private static final Logger LOG = Logger.getLogger(FilterLogData.class); - - private static final boolean DEFAULT_VALUE = true; - - public boolean isAllowed(String jsonBlock, InputMarker inputMarker) { - if (StringUtils.isEmpty(jsonBlock)) { - return DEFAULT_VALUE; - } - Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock); - return isAllowed(jsonObj, inputMarker); - } - - public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) { - if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype())) - return true; - - boolean isAllowed = applyFilter(jsonObj); - if (!isAllowed) { - LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); - } - return isAllowed; - } - - - private boolean applyFilter(Map<String, Object> jsonObj) { - if (MapUtils.isEmpty(jsonObj)) { - LOG.warn("Output jsonobj is empty"); - return DEFAULT_VALUE; - } - - String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); - String logId = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); - String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); - if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(logId) && StringUtils.isNotBlank(level)) { - return LogLevelFilterHandler.isAllowed(hostName, logId, level); - } else { - return DEFAULT_VALUE; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java index e44873b..83c293b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java @@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.loglevelfilter; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -29,19 +28,28 @@ import java.util.Map; import java.util.TimeZone; import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; public class LogLevelFilterHandler implements LogLevelFilterMonitor { - private static final Logger LOG = Logger.getLogger(LogLevelFilterHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(LogLevelFilterHandler.class); private static final String TIMEZONE = "GMT"; private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; + + private static final boolean DEFAULT_VALUE = true; private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() { protected DateFormat initialValue() { @@ -50,16 +58,19 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { return dateFormat; } }; - - private static LogSearchConfig config; - private static boolean filterEnabled; - private static List<String> defaultLogLevels; - private static Map<String, LogLevelFilter> filters = new HashMap<>(); - - public static void init(LogSearchConfig config_) { - config = config_; - filterEnabled = LogFeederPropertiesUtil.isLogFilterEnabled(); - defaultLogLevels = Arrays.asList(LogFeederPropertiesUtil.getIncludeDefaultLevel().split(",")); + + @Inject + private LogFeederProps logFeederProps; + + private LogSearchConfig config; + private Map<String, LogLevelFilter> filters = new HashMap<>(); + + public LogLevelFilterHandler(LogSearchConfig config) { + this.config = config; + } + + @PostConstruct + public void init() { TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE)); } @@ -77,8 +88,8 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { } } - public static boolean isAllowed(String hostName, String logId, String level) { - if (!filterEnabled) { + public boolean isAllowed(String hostName, String logId, String level) { + if (!logFeederProps.isLogLevelFilterEnabled()) { return true; } @@ -87,7 +98,43 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { return allowedLevels.isEmpty() || allowedLevels.contains(level); } - private static synchronized LogLevelFilter findLogFilter(String logId) { + public boolean isAllowed(String jsonBlock, InputMarker inputMarker) { + if (org.apache.commons.lang3.StringUtils.isEmpty(jsonBlock)) { + return DEFAULT_VALUE; + } + Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock); + return isAllowed(jsonObj, inputMarker); + } + + public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) { + if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype())) + return true; + + boolean isAllowed = applyFilter(jsonObj); + if (!isAllowed) { + LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); + } + return isAllowed; + } + + + public boolean applyFilter(Map<String, Object> jsonObj) { + if (MapUtils.isEmpty(jsonObj)) { + LOG.warn("Output jsonobj is empty"); + return DEFAULT_VALUE; + } + + String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); + String logId = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); + String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); + if (org.apache.commons.lang3.StringUtils.isNotBlank(hostName) && org.apache.commons.lang3.StringUtils.isNotBlank(logId) && org.apache.commons.lang3.StringUtils.isNotBlank(level)) { + return isAllowed(hostName, logId, level); + } else { + return DEFAULT_VALUE; + } + } + + private synchronized LogLevelFilter findLogFilter(String logId) { LogLevelFilter logFilter = filters.get(logId); if (logFilter != null) { return logFilter; @@ -96,10 +143,10 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { LOG.info("Filter is not present for log " + logId + ", creating default filter"); LogLevelFilter defaultFilter = new LogLevelFilter(); defaultFilter.setLabel(logId); - defaultFilter.setDefaultLevels(defaultLogLevels); + defaultFilter.setDefaultLevels(logFeederProps.getIncludeDefaultLogLevels()); try { - config.createLogLevelFilter(LogFeederPropertiesUtil.getClusterName(), logId, defaultFilter); + config.createLogLevelFilter(logFeederProps.getClusterName(), logId, defaultFilter); filters.put(logId, defaultFilter); } catch (Exception e) { LOG.warn("Could not persist the default filter for log " + logId, e); @@ -108,7 +155,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { return defaultFilter; } - private static List<String> getAllowedLevels(String hostName, LogLevelFilter componentFilter) { + private List<String> getAllowedLevels(String hostName, LogLevelFilter componentFilter) { String componentName = componentFilter.getLabel(); List<String> hosts = componentFilter.getHosts(); List<String> defaultLevels = componentFilter.getDefaultLevels(); @@ -134,7 +181,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { return defaultLevels; } - private static boolean isFilterExpired(LogLevelFilter logLevelFilter) { + private boolean isFilterExpired(LogLevelFilter logLevelFilter) { if (logLevelFilter == null) return false; http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java index c832358..ba986c7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -19,15 +19,13 @@ package org.apache.ambari.logfeeder.metrics; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; -import org.apache.ambari.logfeeder.util.SSLUtil; +import org.apache.ambari.logfeeder.conf.LogFeederSecurityConfig; +import org.apache.ambari.logfeeder.conf.MetricsCollectorConfig; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.log4j.Logger; -import com.google.common.base.Splitter; - import java.util.Collection; import java.util.List; @@ -40,16 +38,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink { private final String collectorPort; private final String collectorPath; - public LogFeederAMSClient() { - String collectorHostsString = LogFeederPropertiesUtil.getMetricsCollectorHosts(); + public LogFeederAMSClient(MetricsCollectorConfig metricsCollectorConfig, LogFeederSecurityConfig securityConfig) { + String collectorHostsString = metricsCollectorConfig.getHostsString(); if (!StringUtils.isBlank(collectorHostsString)) { collectorHostsString = collectorHostsString.trim(); LOG.info("AMS collector Hosts=" + collectorHostsString); - collectorHosts = Splitter.on(",").splitToList(collectorHostsString); - collectorProtocol = LogFeederPropertiesUtil.getMetricsCollectorProtocol(); - collectorPort = LogFeederPropertiesUtil.getMetricsCollectorPort(); - collectorPath = LogFeederPropertiesUtil.getMetricsCollectorPath(); + collectorHosts = metricsCollectorConfig.getHosts(); + collectorProtocol = metricsCollectorConfig.getProtocol(); + collectorPort = metricsCollectorConfig.getPort(); + collectorPath = metricsCollectorConfig.getPath(); } else { collectorHosts = null; collectorProtocol = null; @@ -57,8 +55,8 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink { collectorPath = null; } - if (StringUtils.isNotBlank(SSLUtil.getTrustStoreLocation())) { - loadTruststore(SSLUtil.getTrustStoreLocation(), SSLUtil.getTrustStoreType(), SSLUtil.getTrustStorePassword()); + if (StringUtils.isNotBlank(securityConfig.getTrustStoreLocation())) { + loadTruststore(securityConfig.getTrustStoreLocation(), securityConfig.getTrustStoreType(), securityConfig.getTrustStorePassword()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java index 6e8ac04..96084c1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java @@ -24,11 +24,16 @@ import java.util.HashMap; import java.util.List; import java.util.TreeMap; +import org.apache.ambari.logfeeder.conf.LogFeederSecurityConfig; +import org.apache.ambari.logfeeder.conf.MetricsCollectorConfig; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.log4j.Logger; +import javax.annotation.PostConstruct; +import javax.inject.Inject; + public class MetricsManager { private static final Logger LOG = Logger.getLogger(MetricsManager.class); @@ -40,13 +45,20 @@ public class MetricsManager { private int publishIntervalMS = 60 * 1000; private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep the metrics in memory forever - private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>(); + private HashMap<String, TimelineMetric> metricsMap = new HashMap<>(); private LogFeederAMSClient amsClient = null; + @Inject + private MetricsCollectorConfig metricsCollectorConfig; + + @Inject + private LogFeederSecurityConfig logFeederSecurityConfig; + + @PostConstruct public void init() { LOG.info("Initializing MetricsManager()"); if (amsClient == null) { - amsClient = new LogFeederAMSClient(); + amsClient = new LogFeederAMSClient(metricsCollectorConfig, logFeederSecurityConfig); } if (amsClient.getCollectorUri(null) != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java new file mode 100644 index 0000000..1dd9287 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.metrics; + +import org.apache.ambari.logfeeder.common.ConfigHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; + +public class StatsLogger extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(StatsLogger.class); + + private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours + + private long lastCheckPointCleanedMS = 0; + + @Inject + private ConfigHandler configHandler; + + @Inject + private MetricsManager metricsManager; + + public StatsLogger() { + super("statLogger"); + setDaemon(true); + } + + @PostConstruct + public void init() { + this.start(); + } + + @Override + public void run() { + while (true) { + try { + Thread.sleep(30 * 1000); + } catch (Throwable t) { + // Ignore + } + try { + logStats(); + } catch (Throwable t) { + LOG.error("LogStats: Caught exception while logging stats.", t); + } + + if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) { + lastCheckPointCleanedMS = System.currentTimeMillis(); + configHandler.cleanCheckPointFiles(); + } + } + } + + private void logStats() { + configHandler.logStats(); + if (metricsManager.isMetricsEnabled()) { + List<MetricData> metricsList = new ArrayList<MetricData>(); + configHandler.addMetrics(metricsList); + metricsManager.useMetrics(metricsList); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java index e1a0bb9..4576deb 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Map; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.csv.CSVFormat; @@ -41,8 +42,8 @@ public class OutputFile extends Output { private String codec; @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); filePath = getStringValue("path"); if (StringUtils.isEmpty(filePath)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index ba4d60a..ed66eb0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -19,6 +19,7 @@ package org.apache.ambari.logfeeder.output; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.spool.LogSpooler; import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; @@ -26,7 +27,6 @@ import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -65,8 +65,8 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC private LogSpooler logSpooler; @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); hdfsOutDir = getStringValue("hdfs_out_dir"); hdfsHost = getStringValue("hdfs_host"); hdfsPort = getStringValue("hdfs_port"); @@ -88,7 +88,7 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC HashMap<String, String> contextParam = buildContextParam(); hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam); LOG.info("hdfs Output dir=" + hdfsOutDir); - String localFileDir = LogFeederPropertiesUtil.getLogFeederTempDir() + "hdfs/service/"; + String localFileDir = logFeederProps.getTmpDir() + "hdfs/service/"; logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this); this.startHDFSCopyThread(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java index 52fc6f8..5c8ec82 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedTransferQueue; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; @@ -66,8 +67,8 @@ public class OutputKafka extends Output { } @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); Properties props = initProperties(); producer = creteKafkaProducer(props); http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java index 48716fa..f5c4176 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java @@ -26,10 +26,12 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData; +import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.MurmurHash; @@ -38,6 +40,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import javax.inject.Inject; + public class OutputManager { private static final Logger LOG = Logger.getLogger(OutputManager.class); @@ -51,6 +55,12 @@ public class OutputManager { private static long docCounter = 0; private MetricData messageTruncateMetric = new MetricData(null, false); + @Inject + private LogLevelFilterHandler logLevelFilterHandler; + + @Inject + private LogFeederProps logFeederProps; + private OutputLineFilter outputLineFilter = new OutputLineFilter(); public List<Output> getOutputs() { @@ -73,7 +83,7 @@ public class OutputManager { public void init() throws Exception { for (Output output : outputs) { - output.init(); + output.init(logFeederProps); } } @@ -146,7 +156,7 @@ public class OutputManager { jsonObj.put("message_md5", "" + MurmurHash.hash64A(logMessage.getBytes(), 31174077)); } } - if (FilterLogData.INSTANCE.isAllowed(jsonObj, inputMarker) + if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker) && !outputLineFilter.apply(jsonObj, inputMarker.input)) { for (Output output : input.getOutputList()) { try { @@ -179,7 +189,7 @@ public class OutputManager { } public void write(String jsonBlock, InputMarker inputMarker) { - if (FilterLogData.INSTANCE.isAllowed(jsonBlock, inputMarker)) { + if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker)) { for (Output output : inputMarker.input.getOutputList()) { try { output.write(jsonBlock, inputMarker); @@ -255,4 +265,22 @@ public class OutputManager { } } } + + public LogLevelFilterHandler getLogLevelFilterHandler() { + return logLevelFilterHandler; + } + + @VisibleForTesting + public void setLogLevelFilterHandler(LogLevelFilterHandler logLevelFilterHandler) { + this.logLevelFilterHandler = logLevelFilterHandler; + } + + public LogFeederProps getLogFeederProps() { + return logFeederProps; + } + + @VisibleForTesting + public void setLogFeederProps(LogFeederProps logFeederProps) { + this.logFeederProps = logFeederProps; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index 5b213e8..d8eed2b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -20,13 +20,13 @@ package org.apache.ambari.logfeeder.output; import com.google.common.annotations.VisibleForTesting; import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.spool.LogSpooler; import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.S3Util; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; @@ -62,8 +62,8 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH private S3Uploader s3Uploader; @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this); } @@ -206,7 +206,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH @VisibleForTesting protected LogSpooler createSpooler(String filePath) { - String spoolDirectory = LogFeederPropertiesUtil.getLogFeederTempDir() + "/s3/service"; + String spoolDirectory = getLogFeederProps().getTmpDir() + "/s3/service"; LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath)); return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this, s3OutputConfiguration.getRolloverTimeThresholdSecs()); http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index 38219df..cdb869a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -35,9 +35,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.DateUtil; -import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties; import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; @@ -109,8 +109,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher { } @Override - public void init() throws Exception { - super.init(); + public void init(LogFeederProps logFeederProps) throws Exception { + super.init(logFeederProps); initParams(); setupSecurity(); createOutgoingBuffer(); @@ -175,8 +175,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher { } private void setupSecurity() { - String jaasFile = LogFeederPropertiesUtil.getSolrJaasFile(); - boolean securityEnabled = LogFeederPropertiesUtil.isSolrKerberosEnabled(); + String jaasFile = getLogFeederProps().getLogFeederSecurityConfig().getSolrJaasFile(); + boolean securityEnabled = getLogFeederProps().getLogFeederSecurityConfig().isSolrKerberosEnabled(); if (securityEnabled) { System.setProperty("java.security.auth.login.config", jaasFile); HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer()); http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java deleted file mode 100644 index 1636653..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.util; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.util.HashMap; -import java.util.Properties; - -import org.apache.ambari.logfeeder.LogFeeder; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; - -/** - * This class contains utility methods used by LogFeeder - */ -public class LogFeederPropertiesUtil { - private static final Logger LOG = Logger.getLogger(LogFeederPropertiesUtil.class); - - public static final String LOGFEEDER_PROPERTIES_FILE = "logfeeder.properties"; - - private static Properties props; - public static Properties getProperties() { - return props; - } - - public static void loadProperties() throws Exception { - loadProperties(LOGFEEDER_PROPERTIES_FILE); - } - - /** - * This method will read the properties from System, followed by propFile and finally from the map - */ - public static void loadProperties(String propFile) throws Exception { - LOG.info("Loading properties. propFile=" + propFile); - props = new Properties(System.getProperties()); - boolean propLoaded = false; - - // First get properties file path from environment value - String propertiesFilePath = System.getProperty("properties"); - if (StringUtils.isNotEmpty(propertiesFilePath)) { - File propertiesFile = new File(propertiesFilePath); - if (propertiesFile.exists() && propertiesFile.isFile()) { - LOG.info("Properties file path set in environment. Loading properties file=" + propertiesFilePath); - try (FileInputStream fis = new FileInputStream(propertiesFile)) { - props.load(fis); - propLoaded = true; - } catch (Throwable t) { - LOG.error("Error loading properties file. properties file=" + propertiesFile.getAbsolutePath()); - } - } else { - LOG.error("Properties file path set in environment, but file not found. properties file=" + propertiesFilePath); - } - } - - if (!propLoaded) { - try (BufferedInputStream bis = (BufferedInputStream) LogFeeder.class.getClassLoader().getResourceAsStream(propFile)) { - // Properties not yet loaded, let's try from class loader - if (bis != null) { - LOG.info("Loading properties file " + propFile + " from classpath"); - props.load(bis); - propLoaded = true; - } else { - LOG.fatal("Properties file not found in classpath. properties file name= " + propFile); - } - } - } - - if (!propLoaded) { - LOG.fatal("Properties file is not loaded."); - throw new Exception("Properties not loaded"); - } - } - - public static String getStringProperty(String key) { - return props == null ? null : props.getProperty(key); - } - - public static String getStringProperty(String key, String defaultValue) { - return props == null ? defaultValue : props.getProperty(key, defaultValue); - } - - public static boolean getBooleanProperty(String key, boolean defaultValue) { - String value = getStringProperty(key); - return toBoolean(value, defaultValue); - } - - private static boolean toBoolean(String value, boolean defaultValue) { - if (StringUtils.isEmpty(value)) { - return defaultValue; - } - - return "true".equalsIgnoreCase(value) || "yes".equalsIgnoreCase(value); - } - - public static int getIntProperty(String key, int defaultValue) { - return getIntProperty(key, defaultValue, null, null); - } - - public static int getIntProperty(String key, int defaultValue, Integer minValue, Integer maxValue) { - String value = getStringProperty(key); - int retValue = LogFeederUtil.objectToInt(value, defaultValue, ", key=" + key); - if (minValue != null && retValue < minValue) { - LOG.info("Minimum rule was applied for " + key + ": " + retValue + " < " + minValue); - retValue = minValue; - } - if (maxValue != null && retValue > maxValue) { - LOG.info("Maximum rule was applied for " + key + ": " + retValue + " > " + maxValue); - retValue = maxValue; - } - return retValue; - } - - private static final String CLUSTER_NAME_PROPERTY = "cluster.name"; - - @LogSearchPropertyDescription( - name = CLUSTER_NAME_PROPERTY, - description = "The name of the cluster the Log Feeder program runs in.", - examples = {"cl1"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getClusterName() { - return getStringProperty(CLUSTER_NAME_PROPERTY); - } - - private static final String TMP_DIR_PROPERTY = "logfeeder.tmp.dir"; - private static final String DEFAULT_TMP_DIR = "/tmp/$username/logfeeder/"; - private static String logFeederTempDir = null; - - @LogSearchPropertyDescription( - name = TMP_DIR_PROPERTY, - description = "The tmp dir used for creating temporary files.", - examples = {"/tmp/"}, - defaultValue = DEFAULT_TMP_DIR, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public synchronized static String getLogFeederTempDir() { - if (logFeederTempDir == null) { - String tempDirValue = getStringProperty(TMP_DIR_PROPERTY, DEFAULT_TMP_DIR); - HashMap<String, String> contextParam = new HashMap<String, String>(); - String username = System.getProperty("user.name"); - contextParam.put("username", username); - logFeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, contextParam); - } - return logFeederTempDir; - } - - public static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = "hadoop.security.credential.provider.path"; - - @LogSearchPropertyDescription( - name = CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, - description = "The jceks file that provides passwords.", - examples = {"jceks://file/etc/ambari-logsearch-logfeeder/conf/logfeeder.jceks"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getCredentialStoreProviderPath() { - return getStringProperty(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY); - } - - private static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files"; - - @LogSearchPropertyDescription( - name = CONFIG_FILES_PROPERTY, - description = "Comma separated list of the config files containing global / output configurations.", - examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getConfigFiles() { - return getStringProperty(CONFIG_FILES_PROPERTY); - } - - private static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir"; - - @LogSearchPropertyDescription( - name = CONFIG_DIR_PROPERTY, - description = "The directory where shipper configuration files are looked for.", - examples = {"/etc/ambari-logsearch-logfeeder/conf"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getConfigDir() { - return getStringProperty(CONFIG_DIR_PROPERTY); - } - - public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension"; - public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; - - @LogSearchPropertyDescription( - name = CHECKPOINT_EXTENSION_PROPERTY, - description = "The extension used for checkpoint files.", - examples = {"ckp"}, - defaultValue = DEFAULT_CHECKPOINT_EXTENSION, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getCheckPointExtension() { - return getStringProperty(CHECKPOINT_EXTENSION_PROPERTY, DEFAULT_CHECKPOINT_EXTENSION); - } - - private static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder"; - - @LogSearchPropertyDescription( - name = CHECKPOINT_FOLDER_PROPERTY, - description = "The folder wher checkpoint files are stored.", - examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getCheckpointFolder() { - return getStringProperty(CHECKPOINT_FOLDER_PROPERTY); - } - - private static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled"; - private static final boolean DEFAULT_CACHE_ENABLED = false; - - @LogSearchPropertyDescription( - name = CACHE_ENABLED_PROPERTY, - description = "Enables the usage of a cache to avoid duplications.", - examples = {"true"}, - defaultValue = DEFAULT_CACHE_ENABLED + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static boolean isCacheEnabled() { - return getBooleanProperty(CACHE_ENABLED_PROPERTY, DEFAULT_CACHE_ENABLED); - } - - private static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field"; - private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; - - @LogSearchPropertyDescription( - name = CACHE_KEY_FIELD_PROPERTY, - description = "The field which's value should be cached and should be checked for repteitions.", - examples = {"some_field_prone_to_repeating_value"}, - defaultValue = DEFAULT_CACHE_KEY_FIELD, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getCacheKeyField() { - return getStringProperty(CACHE_KEY_FIELD_PROPERTY, DEFAULT_CACHE_KEY_FIELD); - } - - private static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size"; - private static final int DEFAULT_CACHE_SIZE = 100; - - @LogSearchPropertyDescription( - name = CACHE_SIZE_PROPERTY, - description = "The number of log entries to cache in order to avoid duplications.", - examples = {"50"}, - defaultValue = DEFAULT_CACHE_SIZE + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static int getCacheSize() { - return getIntProperty(CACHE_SIZE_PROPERTY, DEFAULT_CACHE_SIZE); - } - - private static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled"; - private static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false; - - @LogSearchPropertyDescription( - name = CACHE_LAST_DEDUP_ENABLED_PROPERTY, - description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.", - examples = {"true"}, - defaultValue = DEFAULT_CACHE_LAST_DEDUP_ENABLED + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static boolean isCacheLastDedupEnabled() { - return getBooleanProperty(CACHE_LAST_DEDUP_ENABLED_PROPERTY, DEFAULT_CACHE_LAST_DEDUP_ENABLED); - } - - private static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval"; - private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; - - @LogSearchPropertyDescription( - name = CACHE_DEDUP_INTERVAL_PROPERTY, - description = "Maximum number of milliseconds between two identical messages to be filtered out.", - examples = {"500"}, - defaultValue = DEFAULT_CACHE_DEDUP_INTERVAL + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getCacheDedupInterval() { - return getStringProperty(CACHE_DEDUP_INTERVAL_PROPERTY, String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)); - } - - private static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable"; - private static final boolean DEFAULT_LOG_FILTER_ENABLE = false; - - @LogSearchPropertyDescription( - name = LOG_FILTER_ENABLE_PROPERTY, - description = "Enables the filtering of the log entries by log level filters.", - examples = {"true"}, - defaultValue = DEFAULT_LOG_FILTER_ENABLE + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static boolean isLogFilterEnabled() { - return getBooleanProperty(LOG_FILTER_ENABLE_PROPERTY, DEFAULT_LOG_FILTER_ENABLE); - } - - private static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level"; - - @LogSearchPropertyDescription( - name = INCLUDE_DEFAULT_LEVEL_PROPERTY, - description = "Comma separtaed list of the default log levels to be enabled by the filtering.", - examples = {"FATAL,ERROR,WARN"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getIncludeDefaultLevel() { - return getStringProperty(INCLUDE_DEFAULT_LEVEL_PROPERTY); - } - - private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab"; - private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file"; - - @LogSearchPropertyDescription( - name = SOLR_JAAS_FILE_PROPERTY, - description = "The jaas file used for solr.", - examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"}, - defaultValue = DEFAULT_SOLR_JAAS_FILE, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getSolrJaasFile() { - return getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE); - } - - private static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable"; - private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false; - - @LogSearchPropertyDescription( - name = SOLR_KERBEROS_ENABLE_PROPERTY, - description = "Enables using kerberos for accessing solr.", - examples = {"true"}, - defaultValue = DEFAULT_SOLR_KERBEROS_ENABLE + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static boolean isSolrKerberosEnabled() { - return getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE); - } - - private static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts"; - - @LogSearchPropertyDescription( - name = METRICS_COLLECTOR_HOSTS_PROPERTY, - description = "Comma separtaed list of metric collector hosts.", - examples = {"c6401.ambari.apache.org"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getMetricsCollectorHosts() { - return getStringProperty(METRICS_COLLECTOR_HOSTS_PROPERTY); - } - - private static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol"; - - @LogSearchPropertyDescription( - name = METRICS_COLLECTOR_PROTOCOL_PROPERTY, - description = "The protocol used by metric collectors.", - examples = {"http", "https"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getMetricsCollectorProtocol() { - return getStringProperty(METRICS_COLLECTOR_PROTOCOL_PROPERTY); - } - - private static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port"; - - @LogSearchPropertyDescription( - name = METRICS_COLLECTOR_PORT_PROPERTY, - description = "The port used by metric collectors.", - examples = {"6188"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getMetricsCollectorPort() { - return getStringProperty(METRICS_COLLECTOR_PORT_PROPERTY); - } - - private static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path"; - - @LogSearchPropertyDescription( - name = METRICS_COLLECTOR_PATH_PROPERTY, - description = "The path used by metric collectors.", - examples = {"/ws/v1/timeline/metrics"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getMetricsCollectorPath() { - return getStringProperty(METRICS_COLLECTOR_PATH_PROPERTY); - } - - private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number"; - private static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0; - - @LogSearchPropertyDescription( - name = SIMULATE_INPUT_NUMBER_PROPERTY, - description = "The number of the simulator instances to run with. O means no simulation.", - examples = {"10"}, - defaultValue = DEFAULT_SIMULATE_INPUT_NUMBER + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static int getSimulateInputNumber() { - return getIntProperty(SIMULATE_INPUT_NUMBER_PROPERTY, DEFAULT_SIMULATE_INPUT_NUMBER); - } - - private static final String SIMULATE_LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level"; - private static final String DEFAULT_SIMULATE_LOG_LEVEL = "WARN"; - - @LogSearchPropertyDescription( - name = SIMULATE_LOG_LEVEL_PROPERTY, - description = "The log level to create the simulated log entries with.", - examples = {"INFO"}, - defaultValue = DEFAULT_SIMULATE_LOG_LEVEL, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getSimulateLogLevel() { - return getStringProperty(SIMULATE_LOG_LEVEL_PROPERTY, DEFAULT_SIMULATE_LOG_LEVEL); - } - - private static final String SIMULATE_NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words"; - private static final int DEFAULT_SIMULATE_NUMBER_OF_WORDS = 1000; - - @LogSearchPropertyDescription( - name = SIMULATE_NUMBER_OF_WORDS_PROPERTY, - description = "The size of the set of words that may be used to create the simulated log entries with.", - examples = {"100"}, - defaultValue = DEFAULT_SIMULATE_NUMBER_OF_WORDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static int getSimulateNumberOfWords() { - return getIntProperty(SIMULATE_NUMBER_OF_WORDS_PROPERTY, DEFAULT_SIMULATE_NUMBER_OF_WORDS, 50, 1000000); - } - - private static final String SIMULATE_MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words"; - private static final int DEFAULT_SIMULATE_MIN_LOG_WORDS = 5; - - @LogSearchPropertyDescription( - name = SIMULATE_MIN_LOG_WORDS_PROPERTY, - description = "The minimum number of words in a simulated log entry.", - examples = {"3"}, - defaultValue = DEFAULT_SIMULATE_MIN_LOG_WORDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static int getSimulateMinLogWords() { - return getIntProperty(SIMULATE_MIN_LOG_WORDS_PROPERTY, DEFAULT_SIMULATE_MIN_LOG_WORDS, 1, 10); - } - - private static final String SIMULATE_MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words"; - private static final int DEFAULT_SIMULATE_MAX_LOG_WORDS = 5; - - @LogSearchPropertyDescription( - name = SIMULATE_MAX_LOG_WORDS_PROPERTY, - description = "The maximum number of words in a simulated log entry.", - examples = {"8"}, - defaultValue = DEFAULT_SIMULATE_MAX_LOG_WORDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static int getSimulateMaxLogWords() { - return getIntProperty(SIMULATE_MAX_LOG_WORDS_PROPERTY, DEFAULT_SIMULATE_MAX_LOG_WORDS, 10, 20); - } - - private static final String SIMULATE_SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds"; - private static final int DEFAULT_SIMULATE_SLEEP_MILLISECONDS = 10000; - - @LogSearchPropertyDescription( - name = SIMULATE_SLEEP_MILLISECONDS_PROPERTY, - description = "The milliseconds to sleep between creating two simulated log entries.", - examples = {"5000"}, - defaultValue = DEFAULT_SIMULATE_SLEEP_MILLISECONDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static int getSimulateSleepMilliseconds() { - return getIntProperty(SIMULATE_SLEEP_MILLISECONDS_PROPERTY, DEFAULT_SIMULATE_SLEEP_MILLISECONDS); - } - - private static final String SIMULATE_LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids"; - - @LogSearchPropertyDescription( - name = SIMULATE_LOG_IDS_PROPERTY, - description = "The comma separated list of log ids for which to create the simulated log entries.", - examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"}, - defaultValue = "The log ids of the installed services in the cluster", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static String getSimulateLogIds() { - return getStringProperty(SIMULATE_LOG_IDS_PROPERTY); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java deleted file mode 100644 index 6bcaac9..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.util; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; - -import java.io.File; -import java.nio.charset.Charset; - -public class SSLUtil { - private static final Logger LOG = Logger.getLogger(SSLUtil.class); - - private static final String KEYSTORE_LOCATION_ARG = "javax.net.ssl.keyStore"; - private static final String TRUSTSTORE_LOCATION_ARG = "javax.net.ssl.trustStore"; - private static final String KEYSTORE_TYPE_ARG = "javax.net.ssl.keyStoreType"; - private static final String TRUSTSTORE_TYPE_ARG = "javax.net.ssl.trustStoreType"; - private static final String KEYSTORE_PASSWORD_ARG = "javax.net.ssl.keyStorePassword"; - private static final String TRUSTSTORE_PASSWORD_ARG = "javax.net.ssl.trustStorePassword"; - private static final String KEYSTORE_PASSWORD_PROPERTY_NAME = "logfeeder_keystore_password"; - private static final String TRUSTSTORE_PASSWORD_PROPERTY_NAME = "logfeeder_truststore_password"; - private static final String KEYSTORE_PASSWORD_FILE = "ks_pass.txt"; - private static final String TRUSTSTORE_PASSWORD_FILE = "ts_pass.txt"; - - private static final String LOGFEEDER_CERT_DEFAULT_FOLDER = "/etc/ambari-logsearch-portal/conf/keys"; - private static final String LOGFEEDER_STORE_DEFAULT_PASSWORD = "bigdata"; - - private SSLUtil() { - throw new UnsupportedOperationException(); - } - - public static String getKeyStoreLocation() { - return System.getProperty(KEYSTORE_LOCATION_ARG); - } - - public static String getKeyStoreType() { - return System.getProperty(KEYSTORE_TYPE_ARG); - } - - public static String getKeyStorePassword() { - return System.getProperty(KEYSTORE_PASSWORD_ARG); - } - - public static String getTrustStoreLocation() { - return System.getProperty(TRUSTSTORE_LOCATION_ARG); - } - - public static String getTrustStoreType() { - return System.getProperty(TRUSTSTORE_TYPE_ARG); - } - - public static String getTrustStorePassword() { - return System.getProperty(TRUSTSTORE_PASSWORD_ARG); - } - - public static void ensureStorePasswords() { - ensureStorePassword(KEYSTORE_LOCATION_ARG, KEYSTORE_PASSWORD_ARG, KEYSTORE_PASSWORD_PROPERTY_NAME, KEYSTORE_PASSWORD_FILE); - ensureStorePassword(TRUSTSTORE_LOCATION_ARG, TRUSTSTORE_PASSWORD_ARG, TRUSTSTORE_PASSWORD_PROPERTY_NAME, TRUSTSTORE_PASSWORD_FILE); - } - - private static void ensureStorePassword(String locationArg, String pwdArg, String propertyName, String fileName) { - if (StringUtils.isNotEmpty(System.getProperty(locationArg)) && StringUtils.isEmpty(System.getProperty(pwdArg))) { - String password = getPassword(propertyName, fileName); - System.setProperty(pwdArg, password); - } - } - - private static String getPassword(String propertyName, String fileName) { - String credentialStorePassword = getPasswordFromCredentialStore(propertyName); - if (credentialStorePassword != null) { - return credentialStorePassword; - } - - String filePassword = getPasswordFromFile(fileName); - if (filePassword != null) { - return filePassword; - } - - return LOGFEEDER_STORE_DEFAULT_PASSWORD; - } - - private static String getPasswordFromCredentialStore(String propertyName) { - try { - String providerPath = LogFeederPropertiesUtil.getCredentialStoreProviderPath(); - if (providerPath == null) { - return null; - } - - Configuration config = new Configuration(); - config.set(LogFeederPropertiesUtil.CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, providerPath); - char[] passwordChars = config.getPassword(propertyName); - return (ArrayUtils.isNotEmpty(passwordChars)) ? new String(passwordChars) : null; - } catch (Exception e) { - LOG.warn(String.format("Could not load password %s from credential store, using default password", propertyName)); - return null; - } - } - - private static String getPasswordFromFile(String fileName) { - try { - File pwdFile = new File(LOGFEEDER_CERT_DEFAULT_FOLDER, fileName); - if (!pwdFile.exists()) { - FileUtils.writeStringToFile(pwdFile, LOGFEEDER_STORE_DEFAULT_PASSWORD, Charset.defaultCharset()); - return LOGFEEDER_STORE_DEFAULT_PASSWORD; - } else { - return FileUtils.readFileToString(pwdFile, Charset.defaultCharset()); - } - } catch (Exception e) { - LOG.warn("Exception occurred during read/write password file for keystore/truststore.", e); - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json index 4ab2eb2..e54c974 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json @@ -22,7 +22,7 @@ "post_map_values": { "logtime": { "map_date": { - "date_pattern": "yyyy-MM-dd HH:mm:ss,SSS" + "target_date_pattern": "yyyy-MM-dd HH:mm:ss,SSS" } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log4j.xml index eb20665..d01160c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log4j.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log4j.xml @@ -23,7 +23,6 @@ <param name="Target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - %m%n" /> - <!-- <param name="ConversionPattern" value="%d [%t] %-5p %c %x - %m%n"/> --> </layout> </appender> @@ -43,10 +42,8 @@ <param name="maxFileSize" value="10MB" /> <param name="maxBackupIndex" value="10" /> <layout class="org.apache.ambari.logsearch.appender.LogsearchConversion" /> - </appender> - + </appender> - <!-- Logs to suppress BEGIN --> <category name="org.apache.solr.common.cloud.ZkStateReader" additivity="false"> <priority value="error" /> <appender-ref ref="daily_rolling_file" /> @@ -56,19 +53,15 @@ <priority value="fatal" /> <appender-ref ref="daily_rolling_file" /> </category> - <!-- Logs to suppress END --> <category name="org.apache.ambari.logfeeder" additivity="false"> <priority value="INFO" /> - <appender-ref ref="console" /> - <!-- <appender-ref ref="daily_rolling_file" /> --> - <appender-ref ref="rolling_file_json"/> + <appender-ref ref="console" /> </category> <root> - <priority value="warn" /> - <!-- <appender-ref ref="console" /> --> - <!-- <appender-ref ref="daily_rolling_file" /> --> + <priority value="info"/> + <appender-ref ref="console"/> </root> </log4j:configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index 115778b..4b49446 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -14,23 +14,27 @@ # limitations under the License. cluster.name=cl1 -logfeeder.checkpoint.folder=target/checkpoints +logsearch.config.zk_connect_string=localhost:2181 + logfeeder.metrics.collector.hosts= -logfeeder.config.dir=target/classes/log-samples/shipper-conf/ -logfeeder.config.files=target/classes/log-samples/shipper-conf/global.config.json,\ - target/classes/log-samples/shipper-conf/input.config-sample.json,\ - target/classes/log-samples/shipper-conf/output.config-sample.json +logfeeder.checkpoint.folder=${LOGFEEDER_RELATIVE_LOCATION:}target/checkpoints +logfeeder.config.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/classes/log-samples/shipper-conf/ +logfeeder.config.files=${LOGFEEDER_RELATIVE_LOCATION:}target/classes/log-samples/shipper-conf/global.config.json,\ + ${LOGFEEDER_RELATIVE_LOCATION:}target/classes/log-samples/shipper-conf/output.config-sample.json + logfeeder.log.filter.enable=true + logfeeder.solr.config.interval=5 logfeeder.solr.core.config.name=history logfeeder.solr.zk_connect_string=localhost:2181 + logfeeder.cache.enabled=true logfeeder.cache.size=100 logfeeder.cache.key.field=log_message logfeeder.cache.dedup.interval=1000 logfeeder.cache.last.dedup.enabled=true -logsearch.config.zk_connect_string=localhost:2181 + logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN #logfeeder tmp dir -logfeeder.tmp.dir=target/tmp +logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java index 8d7e86c..e3a822a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java @@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.filter; import java.util.Map; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.OutputManager; @@ -52,7 +53,7 @@ public class FilterGrokTest { filterGrok.loadConfig(filterGrokDescriptor); filterGrok.setOutputManager(mockOutputManager); filterGrok.setInput(EasyMock.mock(Input.class)); - filterGrok.init(); + filterGrok.init(new LogFeederProps()); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java index acc3d4d..ef10c46 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java @@ -26,6 +26,7 @@ import java.util.TimeZone; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.common.LogFeederException; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl; @@ -54,7 +55,7 @@ public class FilterJSONTest { filterJson = new FilterJSON(); filterJson.loadConfig(filterJsonDescriptor); filterJson.setOutputManager(mockOutputManager); - filterJson.init(); + filterJson.init(new LogFeederProps()); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java index ae978fb..4a85b88 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java @@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.filter; import java.util.Map; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterKeyValueDescriptorImpl; @@ -49,7 +50,7 @@ public class FilterKeyValueTest { filterKeyValue = new FilterKeyValue(); filterKeyValue.loadConfig(filterKeyValueDescriptor); filterKeyValue.setOutputManager(mockOutputManager); - filterKeyValue.init(); + filterKeyValue.init(new LogFeederProps()); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java index efebc08..01b4e54 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java @@ -24,6 +24,8 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputFileDescriptorImpl; @@ -61,6 +63,8 @@ public class InputFileTest { private InputMarker testInputMarker; + private LogFeederProps logFeederProps; + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -74,6 +78,9 @@ public class InputFileTest { @Before public void setUp() throws Exception { + logFeederProps = new LogFeederProps(); + LogEntryCacheConfig logEntryCacheConfig = new LogEntryCacheConfig(); + logFeederProps.setLogEntryCacheConfig(logEntryCacheConfig); } public void init(String path) throws Exception { @@ -87,7 +94,7 @@ public class InputFileTest { Filter capture = new Filter() { @Override - public void init() { + public void init(LogFeederProps logFeederProps) { } @Override @@ -103,7 +110,7 @@ public class InputFileTest { inputFile = new InputFile(); inputFile.loadConfig(inputFileDescriptor); inputFile.addFilter(capture); - inputFile.init(); + inputFile.init(logFeederProps); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/31e8e55a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java index 46fbc3b..9dba349 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.*; import java.util.ArrayList; import java.util.List; +import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.metrics.MetricData; import org.junit.Test; @@ -62,10 +63,12 @@ public class InputManagerTest { Input input1 = strictMock(Input.class); Input input2 = strictMock(Input.class); Input input3 = strictMock(Input.class); - - input1.init(); expectLastCall(); - input2.init(); expectLastCall(); - input3.init(); expectLastCall(); + + LogFeederProps logFeederProps = new LogFeederProps(); + + input1.init(logFeederProps); expectLastCall(); + input2.init(logFeederProps); expectLastCall(); + input3.init(logFeederProps); expectLastCall(); expect(input1.isReady()).andReturn(true); expect(input2.isReady()).andReturn(true); @@ -78,6 +81,7 @@ public class InputManagerTest { replay(input1, input2, input3); InputManager manager = new InputManager(); + manager.setLogFeederProps(logFeederProps); manager.add("serviceName", input1); manager.add("serviceName", input2); manager.add("serviceName", input3);