AMBARI-21592 Log Feeder properties should be handled by one class (mgergely)
Change-Id: I3193f3b7a4b7f64ed7d60191f446640500b46a0e Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/27386c3d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/27386c3d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/27386c3d Branch: refs/heads/branch-feature-logsearch-ui Commit: 27386c3d9eae250a72f3b23308c752e5e32765c1 Parents: 98f15c9 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Fri Aug 4 11:03:51 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Fri Aug 4 11:03:51 2017 +0200 ---------------------------------------------------------------------- .../org/apache/ambari/logfeeder/LogFeeder.java | 10 +- .../ambari/logfeeder/common/ConfigHandler.java | 27 +- .../logfeeder/input/AbstractInputFile.java | 4 +- .../apache/ambari/logfeeder/input/Input.java | 70 +-- .../logfeeder/input/InputConfigUploader.java | 17 +- .../ambari/logfeeder/input/InputManager.java | 28 +- .../ambari/logfeeder/input/InputSimulate.java | 77 +-- .../loglevelfilter/LogLevelFilterHandler.java | 29 +- .../logfeeder/metrics/LogFeederAMSClient.java | 45 +- .../ambari/logfeeder/output/OutputHDFSFile.java | 3 +- .../ambari/logfeeder/output/OutputS3File.java | 3 +- .../ambari/logfeeder/output/OutputSolr.java | 39 +- .../logfeeder/util/LogFeederPropertiesUtil.java | 498 +++++++++++++++++++ .../ambari/logfeeder/util/LogFeederUtil.java | 165 +----- .../apache/ambari/logfeeder/util/SSLUtil.java | 14 +- .../logconfig/LogConfigHandlerTest.java | 4 +- .../logfeeder/metrics/MetricsManagerTest.java | 4 +- 17 files changed, 575 insertions(+), 462 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index ba3412b..2461819 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -36,7 +36,7 @@ import org.apache.ambari.logfeeder.input.InputConfigUploader; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.metrics.MetricsManager; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.SSLUtil; import com.google.common.collect.Maps; import com.google.gson.GsonBuilder; @@ -80,13 +80,13 @@ public class LogFeeder { SSLUtil.ensureStorePasswords(); - config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()), - LogFeederUtil.getClusterName(), LogSearchConfigZK.class); + config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,Maps.fromProperties(LogFeederPropertiesUtil.getProperties()), + LogFeederPropertiesUtil.getClusterName(), LogSearchConfigZK.class); configHandler = new ConfigHandler(config); configHandler.init(); LogLevelFilterHandler.init(config); InputConfigUploader.load(config); - config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederUtil.getClusterName()); + config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederPropertiesUtil.getClusterName()); metricsManager.init(); @@ -182,7 +182,7 @@ public class LogFeeder { if (cli.isMonitor()) { try { - LogFeederUtil.loadProperties(); + LogFeederPropertiesUtil.loadProperties(); } catch (Throwable t) { LOG.warn("Could not load logfeeder properites"); System.exit(1); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 30b61a1..11df9dc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -47,9 +47,9 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; @@ -62,30 +62,9 @@ import org.apache.log4j.Logger; import com.google.gson.reflect.TypeToken; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class ConfigHandler implements InputConfigMonitor { private static final Logger LOG = Logger.getLogger(ConfigHandler.class); - @LogSearchPropertyDescription( - name = "logfeeder.config.files", - description = "Comma separated list of the config files containing global / output configurations.", - examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"}, - defaultValue = "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files"; - - private static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0; - @LogSearchPropertyDescription( - name = "logfeeder.simulate.input_number", - description = "The number of the simulator instances to run with. O means no simulation.", - examples = {"10"}, - defaultValue = DEFAULT_SIMULATE_INPUT_NUMBER + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number"; - private final LogSearchConfig logSearchConfig; private final OutputManager outputManager = new OutputManager(); @@ -135,7 +114,7 @@ public class ConfigHandler implements InputConfigMonitor { private List<String> getConfigFiles() { List<String> configFiles = new ArrayList<>(); - String logFeederConfigFilesProperty = LogFeederUtil.getStringProperty(CONFIG_FILES_PROPERTY); + String logFeederConfigFilesProperty = LogFeederPropertiesUtil.getConfigFiles(); LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty); if (logFeederConfigFilesProperty != null) { configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(","))); @@ -238,7 +217,7 @@ public class ConfigHandler implements InputConfigMonitor { } private void simulateIfNeeded() throws Exception { - int simulatedInputNumber = LogFeederUtil.getIntProperty(SIMULATE_INPUT_NUMBER_PROPERTY, DEFAULT_SIMULATE_INPUT_NUMBER); + int simulatedInputNumber = LogFeederPropertiesUtil.getSimulateInputNumber(); if (simulatedInputNumber == 0) return; http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index 9535260..b021c37 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.ambari.logfeeder.util.FileUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; import org.apache.commons.lang.ObjectUtils; @@ -68,8 +69,7 @@ public abstract class AbstractInputFile extends Input { public void init() throws Exception { LOG.info("init() called"); - checkPointExtension = LogFeederUtil.getStringProperty(InputManager.CHECKPOINT_EXTENSION_PROPERTY, - InputManager.DEFAULT_CHECKPOINT_EXTENSION); + checkPointExtension = LogFeederPropertiesUtil.getCheckPointExtension(); // Let's close the file and set it to true after we start monitoring it setClosed(true); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 8050263..972011d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -31,8 +31,7 @@ import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions; import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; @@ -40,58 +39,7 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; import org.apache.commons.lang.BooleanUtils; import org.apache.log4j.Priority; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public abstract class Input extends ConfigItem implements Runnable { - private static final boolean DEFAULT_CACHE_ENABLED = false; - @LogSearchPropertyDescription( - name = "logfeeder.cache.enabled", - description = "Enables the usage of a cache to avoid duplications.", - examples = {"true"}, - defaultValue = DEFAULT_CACHE_ENABLED + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled"; - - private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; - @LogSearchPropertyDescription( - name = "logfeeder.cache.key.field", - description = "The field which's value should be cached and should be checked for repteitions.", - examples = {"some_field_prone_to_repeating_value"}, - defaultValue = DEFAULT_CACHE_KEY_FIELD, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field"; - - private static final int DEFAULT_CACHE_SIZE = 100; - @LogSearchPropertyDescription( - name = "logfeeder.cache.size", - description = "The number of log entries to cache in order to avoid duplications.", - examples = {"50"}, - defaultValue = DEFAULT_CACHE_SIZE + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size"; - - private static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false; - @LogSearchPropertyDescription( - name = "logfeeder.cache.last.dedup.enabled", - description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.", - examples = {"true"}, - defaultValue = DEFAULT_CACHE_LAST_DEDUP_ENABLED + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled"; - - private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; - @LogSearchPropertyDescription( - name = "logfeeder.cache.dedup.interval", - description = "Maximum number of milliseconds between two identical messages to be filtered out.", - examples = {"500"}, - defaultValue = DEFAULT_CACHE_DEDUP_INTERVAL + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval"; private static final boolean DEFAULT_TAIL = true; private static final boolean DEFAULT_USE_EVENT_MD5 = false; @@ -294,25 +242,25 @@ public abstract class Input extends ConfigItem implements Runnable { private void initCache() { boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null ? inputDescriptor.isCacheEnabled() - : LogFeederUtil.getBooleanProperty(CACHE_ENABLED_PROPERTY, DEFAULT_CACHE_ENABLED); + : LogFeederPropertiesUtil.isCacheEnabled(); if (cacheEnabled) { String cacheKeyField = inputDescriptor.getCacheKeyField() != null ? inputDescriptor.getCacheKeyField() - : LogFeederUtil.getStringProperty(CACHE_KEY_FIELD_PROPERTY, DEFAULT_CACHE_KEY_FIELD); + : LogFeederPropertiesUtil.getCacheKeyField(); setCacheKeyField(cacheKeyField); - boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null - ? inputDescriptor.getCacheLastDedupEnabled() - : LogFeederUtil.getBooleanProperty(CACHE_LAST_DEDUP_ENABLED_PROPERTY, DEFAULT_CACHE_LAST_DEDUP_ENABLED); - int cacheSize = inputDescriptor.getCacheSize() != null ? inputDescriptor.getCacheSize() - : LogFeederUtil.getIntProperty(CACHE_SIZE_PROPERTY, DEFAULT_CACHE_SIZE); + : LogFeederPropertiesUtil.getCacheSize(); + + boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null + ? inputDescriptor.getCacheLastDedupEnabled() + : LogFeederPropertiesUtil.isCacheLastDedupEnabled(); long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null ? inputDescriptor.getCacheDedupInterval() - : Long.parseLong(LogFeederUtil.getStringProperty(CACHE_DEDUP_INTERVAL_PROPERTY, String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL))); + : Long.parseLong(LogFeederPropertiesUtil.getCacheDedupInterval()); setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled)); } http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index 10642d1..5fdfded 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -28,25 +28,14 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.ambari.logsearch.config.api.LogSearchConfig; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.log4j.Logger; import com.google.common.io.Files; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class InputConfigUploader extends Thread { protected static final Logger LOG = Logger.getLogger(InputConfigUploader.class); - @LogSearchPropertyDescription( - name = "logfeeder.config.dir", - description = "The directory where shipper configuration files are looked for.", - examples = {"/etc/ambari-logsearch-logfeeder/conf"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir"; - private static final long SLEEP_BETWEEN_CHECK = 2000; private final File configDir; @@ -68,7 +57,7 @@ public class InputConfigUploader extends Thread { super("Input Config Loader"); setDaemon(true); - this.configDir = new File(LogFeederUtil.getStringProperty(CONFIG_DIR_PROPERTY)); + this.configDir = new File(LogFeederPropertiesUtil.getConfigDir()); this.config = config; } @@ -85,7 +74,7 @@ public class InputConfigUploader extends Thread { String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); if (!config.inputConfigExistsLogFeeder(serviceName)) { - config.createInputConfig(LogFeederUtil.getClusterName(), serviceName, inputConfig); + config.createInputConfig(LogFeederPropertiesUtil.getClusterName(), serviceName, inputConfig); } filesHandled.add(inputConfigFile.getAbsolutePath()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java index 091015a..f1b422f 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -35,36 +35,16 @@ import java.util.UUID; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.FileUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.solr.common.util.Base64; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class InputManager { private static final Logger LOG = Logger.getLogger(InputManager.class); - public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; - @LogSearchPropertyDescription( - name = "logfeeder.checkpoint.extension", - description = "The extension used for checkpoint files.", - examples = {"ckp"}, - defaultValue = DEFAULT_CHECKPOINT_EXTENSION, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension"; - - @LogSearchPropertyDescription( - name = "logfeeder.checkpoint.folder", - description = "The folder wher checkpoint files are stored.", - examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder"; - private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints"; private Map<String, List<Input>> inputs = new HashMap<>(); @@ -138,11 +118,11 @@ public class InputManager { } private void initCheckPointSettings() { - checkPointExtension = LogFeederUtil.getStringProperty(CHECKPOINT_EXTENSION_PROPERTY, DEFAULT_CHECKPOINT_EXTENSION); + checkPointExtension = LogFeederPropertiesUtil.getCheckPointExtension(); LOG.info("Determining valid checkpoint folder"); boolean isCheckPointFolderValid = false; // We need to keep track of the files we are reading. - String checkPointFolder = LogFeederUtil.getStringProperty(CHECKPOINT_FOLDER_PROPERTY); + String checkPointFolder = LogFeederPropertiesUtil.getCheckpointFolder(); if (!StringUtils.isEmpty(checkPointFolder)) { checkPointFolderFile = new File(checkPointFolder); isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); @@ -150,7 +130,7 @@ public class InputManager { if (!isCheckPointFolderValid) { // Let's use tmp folder - String tmpFolder = LogFeederUtil.getLogFeederTempDir(); + String tmpFolder = LogFeederPropertiesUtil.getLogFeederTempDir(); checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME); LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" + checkPointFolderFile); isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index 7c487ba..df6c941 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -33,8 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.filter.FilterJSON; import org.apache.ambari.logfeeder.output.Output; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl; @@ -43,70 +43,9 @@ import org.apache.solr.common.util.Base64; import com.google.common.base.Joiner; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class InputSimulate extends Input { private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}"; - - private static final String DEFAULT_LOG_LEVEL = "WARN"; - @LogSearchPropertyDescription( - name = "logfeeder.simulate.log_level", - description = "The log level to create the simulated log entries with.", - examples = {"INFO"}, - defaultValue = DEFAULT_LOG_LEVEL, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level"; - - private static final int DEFAULT_NUMBER_OF_WORDS = 1000; - @LogSearchPropertyDescription( - name = "logfeeder.simulate.number_of_words", - description = "The size of the set of words that may be used to create the simulated log entries with.", - examples = {"100"}, - defaultValue = DEFAULT_NUMBER_OF_WORDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words"; - - private static final int DEFAULT_MIN_LOG_WORDS = 5; - @LogSearchPropertyDescription( - name = "logfeeder.simulate.min_log_words", - description = "The minimum number of words in a simulated log entry.", - examples = {"3"}, - defaultValue = DEFAULT_MIN_LOG_WORDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words"; - - private static final int DEFAULT_MAX_LOG_WORDS = 5; - @LogSearchPropertyDescription( - name = "logfeeder.simulate.max_log_words", - description = "The maximum number of words in a simulated log entry.", - examples = {"8"}, - defaultValue = DEFAULT_MAX_LOG_WORDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words"; - - private static final int DEFAULT_SLEEP_MILLISECONDS = 10000; - @LogSearchPropertyDescription( - name = "logfeeder.simulate.sleep_milliseconds", - description = "The milliseconds to sleep between creating two simulated log entries.", - examples = {"5000"}, - defaultValue = DEFAULT_SLEEP_MILLISECONDS + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds"; - - @LogSearchPropertyDescription( - name = "logfeeder.simulate.log_ids", - description = "The comma separated list of log ids for which to create the simulated log entries.", - examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"}, - defaultValue = "The log ids of the installed services in the cluster", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids"; - + private static final Map<String, String> typeToFilePath = new HashMap<>(); private static final List<String> inputTypes = new ArrayList<>(); public static void loadTypeToFilePath(List<InputDescriptor> inputList) { @@ -137,11 +76,11 @@ public class InputSimulate extends Input { public InputSimulate() throws Exception { this.types = getSimulatedLogTypes(); - this.level = LogFeederUtil.getStringProperty(LOG_LEVEL_PROPERTY, DEFAULT_LOG_LEVEL); - this.numberOfWords = LogFeederUtil.getIntProperty(NUMBER_OF_WORDS_PROPERTY, DEFAULT_NUMBER_OF_WORDS, 50, 1000000); - this.minLogWords = LogFeederUtil.getIntProperty(MIN_LOG_WORDS_PROPERTY, DEFAULT_MIN_LOG_WORDS, 1, 10); - this.maxLogWords = LogFeederUtil.getIntProperty(MAX_LOG_WORDS_PROPERTY, DEFAULT_MAX_LOG_WORDS, 10, 20); - this.sleepMillis = LogFeederUtil.getIntProperty(SLEEP_MILLISECONDS_PROPERTY, DEFAULT_SLEEP_MILLISECONDS); + this.level = LogFeederPropertiesUtil.getSimulateLogLevel(); + this.numberOfWords = LogFeederPropertiesUtil.getSimulateNumberOfWords(); + this.minLogWords = LogFeederPropertiesUtil.getSimulateMinLogWords(); + this.maxLogWords = LogFeederPropertiesUtil.getSimulateMaxLogWords(); + this.sleepMillis = LogFeederPropertiesUtil.getSimulateSleepMilliseconds(); this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName; Filter filter = new FilterJSON(); @@ -151,7 +90,7 @@ public class InputSimulate extends Input { } private List<String> getSimulatedLogTypes() { - String logsToSimulate = LogFeederUtil.getStringProperty(LOG_IDS_PROPERTY); + String logsToSimulate = LogFeederPropertiesUtil.getSimulateLogIds(); return (logsToSimulate == null) ? inputTypes : Arrays.asList(logsToSimulate.split(",")); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java index 79bf5ea..e44873b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java @@ -29,37 +29,16 @@ import java.util.Map; import java.util.TimeZone; import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class LogLevelFilterHandler implements LogLevelFilterMonitor { private static final Logger LOG = Logger.getLogger(LogLevelFilterHandler.class); - - private static final boolean DEFAULT_LOG_FILTER_ENABLE = false; - @LogSearchPropertyDescription( - name = "logfeeder.log.filter.enable", - description = "Enables the filtering of the log entries by log level filters.", - examples = {"true"}, - defaultValue = DEFAULT_LOG_FILTER_ENABLE + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable"; - - @LogSearchPropertyDescription( - name = "logfeeder.include.default.level", - description = "Comma separtaed list of the default log levels to be enabled by the filtering.", - examples = {"FATAL,ERROR,WARN"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level"; private static final String TIMEZONE = "GMT"; private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; @@ -79,8 +58,8 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { public static void init(LogSearchConfig config_) { config = config_; - filterEnabled = LogFeederUtil.getBooleanProperty(LOG_FILTER_ENABLE_PROPERTY, DEFAULT_LOG_FILTER_ENABLE); - defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty(INCLUDE_DEFAULT_LEVEL_PROPERTY).split(",")); + filterEnabled = LogFeederPropertiesUtil.isLogFilterEnabled(); + defaultLogLevels = Arrays.asList(LogFeederPropertiesUtil.getIncludeDefaultLevel().split(",")); TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE)); } @@ -120,7 +99,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor { defaultFilter.setDefaultLevels(defaultLogLevels); try { - config.createLogLevelFilter(LogFeederUtil.getClusterName(), logId, defaultFilter); + config.createLogLevelFilter(LogFeederPropertiesUtil.getClusterName(), logId, defaultFilter); filters.put(logId, defaultFilter); } catch (Exception e) { LOG.warn("Could not persist the default filter for log " + logId, e); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java index fdad9a6..f446446 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -19,9 +19,8 @@ package org.apache.ambari.logfeeder.metrics; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.SSLUtil; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; @@ -29,8 +28,6 @@ import org.apache.log4j.Logger; import com.google.common.base.Splitter; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - import java.util.Collection; import java.util.List; @@ -38,53 +35,21 @@ import java.util.List; public class LogFeederAMSClient extends AbstractTimelineMetricsSink { private static final Logger LOG = Logger.getLogger(LogFeederAMSClient.class); - @LogSearchPropertyDescription( - name = "logfeeder.metrics.collector.hosts", - description = "Comma separtaed list of metric collector hosts.", - examples = {"c6401.ambari.apache.org"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts"; - - @LogSearchPropertyDescription( - name = "logfeeder.metrics.collector.protocol", - description = "The protocol used by metric collectors.", - examples = {"http", "https"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol"; - - @LogSearchPropertyDescription( - name = "logfeeder.metrics.collector.port", - description = "The port used by metric collectors.", - examples = {"6188"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port"; - - @LogSearchPropertyDescription( - name = "logfeeder.metrics.collector.path", - description = "The path used by metric collectors.", - examples = {"/ws/v1/timeline/metrics"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path"; - private final List<String> collectorHosts; private final String collectorProtocol; private final String collectorPort; private final String collectorPath; public LogFeederAMSClient() { - String collectorHostsString = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_HOSTS_PROPERTY); + String collectorHostsString = LogFeederPropertiesUtil.getMetricsCollectorHosts(); if (!StringUtils.isBlank(collectorHostsString)) { collectorHostsString = collectorHostsString.trim(); LOG.info("AMS collector Hosts=" + collectorHostsString); collectorHosts = Splitter.on(",").splitToList(collectorHostsString); - collectorProtocol = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PROTOCOL_PROPERTY); - collectorPort = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PORT_PROPERTY); - collectorPath = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PATH_PROPERTY); + collectorProtocol = LogFeederPropertiesUtil.getMetricsCollectorProtocol(); + collectorPort = LogFeederPropertiesUtil.getMetricsCollectorPort(); + collectorPath = LogFeederPropertiesUtil.getMetricsCollectorPath(); } else { collectorHosts = null; collectorProtocol = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index 2b47a00..ba4d60a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -26,6 +26,7 @@ import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -87,7 +88,7 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC HashMap<String, String> contextParam = buildContextParam(); hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam); LOG.info("hdfs Output dir=" + hdfsOutDir); - String localFileDir = LogFeederUtil.getLogFeederTempDir() + "hdfs/service/"; + String localFileDir = LogFeederPropertiesUtil.getLogFeederTempDir() + "hdfs/service/"; logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this); this.startHDFSCopyThread(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index 9f41a15..5b213e8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -26,6 +26,7 @@ import org.apache.ambari.logfeeder.output.spool.LogSpooler; import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.S3Util; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; @@ -205,7 +206,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH @VisibleForTesting protected LogSpooler createSpooler(String filePath) { - String spoolDirectory = LogFeederUtil.getLogFeederTempDir() + "/s3/service"; + String spoolDirectory = LogFeederPropertiesUtil.getLogFeederTempDir() + "/s3/service"; LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath)); return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this, s3OutputConfiguration.getRolloverTimeThresholdSecs()); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index e3da864..38219df 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -37,8 +37,8 @@ import java.util.concurrent.TimeUnit; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.DateUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties; import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; import org.apache.commons.lang3.StringUtils; @@ -56,34 +56,12 @@ import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class OutputSolr extends Output implements CollectionStateWatcher { private static final Logger LOG = Logger.getLogger(OutputSolr.class); private static final int OUTPUT_PROPERTIES_WAIT_MS = 10000; private static final int SHARDS_WAIT_MS = 10000; - - private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab"; - @LogSearchPropertyDescription( - name = "logfeeder.solr.jaas.file", - description = "The jaas file used for solr.", - examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"}, - defaultValue = DEFAULT_SOLR_JAAS_FILE, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file"; - - private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false; - @LogSearchPropertyDescription( - name = "logfeeder.solr.kerberos.enable", - description = "Enables using kerberos for accessing solr.", - examples = {"true"}, - defaultValue = DEFAULT_SOLR_KERBEROS_ENABLE + "", - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable"; private static final int DEFAULT_MAX_BUFFER_SIZE = 5000; private static final int DEFAULT_MAX_INTERVAL_MS = 3000; @@ -197,8 +175,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher { } private void setupSecurity() { - String jaasFile = LogFeederUtil.getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE); - boolean securityEnabled = LogFeederUtil.getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE); + String jaasFile = LogFeederPropertiesUtil.getSolrJaasFile(); + boolean securityEnabled = LogFeederPropertiesUtil.isSolrKerberosEnabled(); if (securityEnabled) { System.setProperty("java.security.auth.login.config", jaasFile); HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer()); @@ -397,21 +375,22 @@ public class OutputSolr extends Output implements CollectionStateWatcher { if (outputData != null) { createSolrDocument(outputData); } else { - if (isDrain() && outgoingBuffer.size() == 0) { + if (isDrain() && outgoingBuffer.isEmpty()) { break; } } - if (localBuffer.size() > 0 && ((outputData == null && isDrain()) || - (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) { + if (!localBuffer.isEmpty() && + (outputData == null && isDrain() || nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize) + ) { boolean response = sendToSolr(outputData); - if( isDrain() && !response) { + if (isDrain() && !response) { //Since sending to Solr response failed and it is in draining mode, let's break; LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" + getShortDescription()); break; } } - if (localBuffer.size() == 0) { + if (localBuffer.isEmpty()) { //If localBuffer is empty, then reset the timer lastDispatchTime = currTimeMS; } http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java new file mode 100644 index 0000000..1636653 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederPropertiesUtil.java @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.util; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.ambari.logfeeder.LogFeeder; +import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; + +/** + * This class contains utility methods used by LogFeeder + */ +public class LogFeederPropertiesUtil { + private static final Logger LOG = Logger.getLogger(LogFeederPropertiesUtil.class); + + public static final String LOGFEEDER_PROPERTIES_FILE = "logfeeder.properties"; + + private static Properties props; + public static Properties getProperties() { + return props; + } + + public static void loadProperties() throws Exception { + loadProperties(LOGFEEDER_PROPERTIES_FILE); + } + + /** + * This method will read the properties from System, followed by propFile and finally from the map + */ + public static void loadProperties(String propFile) throws Exception { + LOG.info("Loading properties. propFile=" + propFile); + props = new Properties(System.getProperties()); + boolean propLoaded = false; + + // First get properties file path from environment value + String propertiesFilePath = System.getProperty("properties"); + if (StringUtils.isNotEmpty(propertiesFilePath)) { + File propertiesFile = new File(propertiesFilePath); + if (propertiesFile.exists() && propertiesFile.isFile()) { + LOG.info("Properties file path set in environment. Loading properties file=" + propertiesFilePath); + try (FileInputStream fis = new FileInputStream(propertiesFile)) { + props.load(fis); + propLoaded = true; + } catch (Throwable t) { + LOG.error("Error loading properties file. properties file=" + propertiesFile.getAbsolutePath()); + } + } else { + LOG.error("Properties file path set in environment, but file not found. properties file=" + propertiesFilePath); + } + } + + if (!propLoaded) { + try (BufferedInputStream bis = (BufferedInputStream) LogFeeder.class.getClassLoader().getResourceAsStream(propFile)) { + // Properties not yet loaded, let's try from class loader + if (bis != null) { + LOG.info("Loading properties file " + propFile + " from classpath"); + props.load(bis); + propLoaded = true; + } else { + LOG.fatal("Properties file not found in classpath. properties file name= " + propFile); + } + } + } + + if (!propLoaded) { + LOG.fatal("Properties file is not loaded."); + throw new Exception("Properties not loaded"); + } + } + + public static String getStringProperty(String key) { + return props == null ? null : props.getProperty(key); + } + + public static String getStringProperty(String key, String defaultValue) { + return props == null ? defaultValue : props.getProperty(key, defaultValue); + } + + public static boolean getBooleanProperty(String key, boolean defaultValue) { + String value = getStringProperty(key); + return toBoolean(value, defaultValue); + } + + private static boolean toBoolean(String value, boolean defaultValue) { + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + + return "true".equalsIgnoreCase(value) || "yes".equalsIgnoreCase(value); + } + + public static int getIntProperty(String key, int defaultValue) { + return getIntProperty(key, defaultValue, null, null); + } + + public static int getIntProperty(String key, int defaultValue, Integer minValue, Integer maxValue) { + String value = getStringProperty(key); + int retValue = LogFeederUtil.objectToInt(value, defaultValue, ", key=" + key); + if (minValue != null && retValue < minValue) { + LOG.info("Minimum rule was applied for " + key + ": " + retValue + " < " + minValue); + retValue = minValue; + } + if (maxValue != null && retValue > maxValue) { + LOG.info("Maximum rule was applied for " + key + ": " + retValue + " > " + maxValue); + retValue = maxValue; + } + return retValue; + } + + private static final String CLUSTER_NAME_PROPERTY = "cluster.name"; + + @LogSearchPropertyDescription( + name = CLUSTER_NAME_PROPERTY, + description = "The name of the cluster the Log Feeder program runs in.", + examples = {"cl1"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getClusterName() { + return getStringProperty(CLUSTER_NAME_PROPERTY); + } + + private static final String TMP_DIR_PROPERTY = "logfeeder.tmp.dir"; + private static final String DEFAULT_TMP_DIR = "/tmp/$username/logfeeder/"; + private static String logFeederTempDir = null; + + @LogSearchPropertyDescription( + name = TMP_DIR_PROPERTY, + description = "The tmp dir used for creating temporary files.", + examples = {"/tmp/"}, + defaultValue = DEFAULT_TMP_DIR, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public synchronized static String getLogFeederTempDir() { + if (logFeederTempDir == null) { + String tempDirValue = getStringProperty(TMP_DIR_PROPERTY, DEFAULT_TMP_DIR); + HashMap<String, String> contextParam = new HashMap<String, String>(); + String username = System.getProperty("user.name"); + contextParam.put("username", username); + logFeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, contextParam); + } + return logFeederTempDir; + } + + public static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = "hadoop.security.credential.provider.path"; + + @LogSearchPropertyDescription( + name = CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, + description = "The jceks file that provides passwords.", + examples = {"jceks://file/etc/ambari-logsearch-logfeeder/conf/logfeeder.jceks"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getCredentialStoreProviderPath() { + return getStringProperty(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY); + } + + private static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files"; + + @LogSearchPropertyDescription( + name = CONFIG_FILES_PROPERTY, + description = "Comma separated list of the config files containing global / output configurations.", + examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getConfigFiles() { + return getStringProperty(CONFIG_FILES_PROPERTY); + } + + private static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir"; + + @LogSearchPropertyDescription( + name = CONFIG_DIR_PROPERTY, + description = "The directory where shipper configuration files are looked for.", + examples = {"/etc/ambari-logsearch-logfeeder/conf"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getConfigDir() { + return getStringProperty(CONFIG_DIR_PROPERTY); + } + + public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension"; + public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; + + @LogSearchPropertyDescription( + name = CHECKPOINT_EXTENSION_PROPERTY, + description = "The extension used for checkpoint files.", + examples = {"ckp"}, + defaultValue = DEFAULT_CHECKPOINT_EXTENSION, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getCheckPointExtension() { + return getStringProperty(CHECKPOINT_EXTENSION_PROPERTY, DEFAULT_CHECKPOINT_EXTENSION); + } + + private static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder"; + + @LogSearchPropertyDescription( + name = CHECKPOINT_FOLDER_PROPERTY, + description = "The folder wher checkpoint files are stored.", + examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getCheckpointFolder() { + return getStringProperty(CHECKPOINT_FOLDER_PROPERTY); + } + + private static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled"; + private static final boolean DEFAULT_CACHE_ENABLED = false; + + @LogSearchPropertyDescription( + name = CACHE_ENABLED_PROPERTY, + description = "Enables the usage of a cache to avoid duplications.", + examples = {"true"}, + defaultValue = DEFAULT_CACHE_ENABLED + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static boolean isCacheEnabled() { + return getBooleanProperty(CACHE_ENABLED_PROPERTY, DEFAULT_CACHE_ENABLED); + } + + private static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field"; + private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; + + @LogSearchPropertyDescription( + name = CACHE_KEY_FIELD_PROPERTY, + description = "The field which's value should be cached and should be checked for repteitions.", + examples = {"some_field_prone_to_repeating_value"}, + defaultValue = DEFAULT_CACHE_KEY_FIELD, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getCacheKeyField() { + return getStringProperty(CACHE_KEY_FIELD_PROPERTY, DEFAULT_CACHE_KEY_FIELD); + } + + private static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size"; + private static final int DEFAULT_CACHE_SIZE = 100; + + @LogSearchPropertyDescription( + name = CACHE_SIZE_PROPERTY, + description = "The number of log entries to cache in order to avoid duplications.", + examples = {"50"}, + defaultValue = DEFAULT_CACHE_SIZE + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static int getCacheSize() { + return getIntProperty(CACHE_SIZE_PROPERTY, DEFAULT_CACHE_SIZE); + } + + private static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled"; + private static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false; + + @LogSearchPropertyDescription( + name = CACHE_LAST_DEDUP_ENABLED_PROPERTY, + description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.", + examples = {"true"}, + defaultValue = DEFAULT_CACHE_LAST_DEDUP_ENABLED + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static boolean isCacheLastDedupEnabled() { + return getBooleanProperty(CACHE_LAST_DEDUP_ENABLED_PROPERTY, DEFAULT_CACHE_LAST_DEDUP_ENABLED); + } + + private static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval"; + private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; + + @LogSearchPropertyDescription( + name = CACHE_DEDUP_INTERVAL_PROPERTY, + description = "Maximum number of milliseconds between two identical messages to be filtered out.", + examples = {"500"}, + defaultValue = DEFAULT_CACHE_DEDUP_INTERVAL + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getCacheDedupInterval() { + return getStringProperty(CACHE_DEDUP_INTERVAL_PROPERTY, String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)); + } + + private static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable"; + private static final boolean DEFAULT_LOG_FILTER_ENABLE = false; + + @LogSearchPropertyDescription( + name = LOG_FILTER_ENABLE_PROPERTY, + description = "Enables the filtering of the log entries by log level filters.", + examples = {"true"}, + defaultValue = DEFAULT_LOG_FILTER_ENABLE + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static boolean isLogFilterEnabled() { + return getBooleanProperty(LOG_FILTER_ENABLE_PROPERTY, DEFAULT_LOG_FILTER_ENABLE); + } + + private static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level"; + + @LogSearchPropertyDescription( + name = INCLUDE_DEFAULT_LEVEL_PROPERTY, + description = "Comma separtaed list of the default log levels to be enabled by the filtering.", + examples = {"FATAL,ERROR,WARN"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getIncludeDefaultLevel() { + return getStringProperty(INCLUDE_DEFAULT_LEVEL_PROPERTY); + } + + private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab"; + private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file"; + + @LogSearchPropertyDescription( + name = SOLR_JAAS_FILE_PROPERTY, + description = "The jaas file used for solr.", + examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"}, + defaultValue = DEFAULT_SOLR_JAAS_FILE, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getSolrJaasFile() { + return getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE); + } + + private static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable"; + private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false; + + @LogSearchPropertyDescription( + name = SOLR_KERBEROS_ENABLE_PROPERTY, + description = "Enables using kerberos for accessing solr.", + examples = {"true"}, + defaultValue = DEFAULT_SOLR_KERBEROS_ENABLE + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static boolean isSolrKerberosEnabled() { + return getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE); + } + + private static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts"; + + @LogSearchPropertyDescription( + name = METRICS_COLLECTOR_HOSTS_PROPERTY, + description = "Comma separtaed list of metric collector hosts.", + examples = {"c6401.ambari.apache.org"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getMetricsCollectorHosts() { + return getStringProperty(METRICS_COLLECTOR_HOSTS_PROPERTY); + } + + private static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol"; + + @LogSearchPropertyDescription( + name = METRICS_COLLECTOR_PROTOCOL_PROPERTY, + description = "The protocol used by metric collectors.", + examples = {"http", "https"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getMetricsCollectorProtocol() { + return getStringProperty(METRICS_COLLECTOR_PROTOCOL_PROPERTY); + } + + private static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port"; + + @LogSearchPropertyDescription( + name = METRICS_COLLECTOR_PORT_PROPERTY, + description = "The port used by metric collectors.", + examples = {"6188"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getMetricsCollectorPort() { + return getStringProperty(METRICS_COLLECTOR_PORT_PROPERTY); + } + + private static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path"; + + @LogSearchPropertyDescription( + name = METRICS_COLLECTOR_PATH_PROPERTY, + description = "The path used by metric collectors.", + examples = {"/ws/v1/timeline/metrics"}, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getMetricsCollectorPath() { + return getStringProperty(METRICS_COLLECTOR_PATH_PROPERTY); + } + + private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number"; + private static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0; + + @LogSearchPropertyDescription( + name = SIMULATE_INPUT_NUMBER_PROPERTY, + description = "The number of the simulator instances to run with. O means no simulation.", + examples = {"10"}, + defaultValue = DEFAULT_SIMULATE_INPUT_NUMBER + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static int getSimulateInputNumber() { + return getIntProperty(SIMULATE_INPUT_NUMBER_PROPERTY, DEFAULT_SIMULATE_INPUT_NUMBER); + } + + private static final String SIMULATE_LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level"; + private static final String DEFAULT_SIMULATE_LOG_LEVEL = "WARN"; + + @LogSearchPropertyDescription( + name = SIMULATE_LOG_LEVEL_PROPERTY, + description = "The log level to create the simulated log entries with.", + examples = {"INFO"}, + defaultValue = DEFAULT_SIMULATE_LOG_LEVEL, + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getSimulateLogLevel() { + return getStringProperty(SIMULATE_LOG_LEVEL_PROPERTY, DEFAULT_SIMULATE_LOG_LEVEL); + } + + private static final String SIMULATE_NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words"; + private static final int DEFAULT_SIMULATE_NUMBER_OF_WORDS = 1000; + + @LogSearchPropertyDescription( + name = SIMULATE_NUMBER_OF_WORDS_PROPERTY, + description = "The size of the set of words that may be used to create the simulated log entries with.", + examples = {"100"}, + defaultValue = DEFAULT_SIMULATE_NUMBER_OF_WORDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static int getSimulateNumberOfWords() { + return getIntProperty(SIMULATE_NUMBER_OF_WORDS_PROPERTY, DEFAULT_SIMULATE_NUMBER_OF_WORDS, 50, 1000000); + } + + private static final String SIMULATE_MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words"; + private static final int DEFAULT_SIMULATE_MIN_LOG_WORDS = 5; + + @LogSearchPropertyDescription( + name = SIMULATE_MIN_LOG_WORDS_PROPERTY, + description = "The minimum number of words in a simulated log entry.", + examples = {"3"}, + defaultValue = DEFAULT_SIMULATE_MIN_LOG_WORDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static int getSimulateMinLogWords() { + return getIntProperty(SIMULATE_MIN_LOG_WORDS_PROPERTY, DEFAULT_SIMULATE_MIN_LOG_WORDS, 1, 10); + } + + private static final String SIMULATE_MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words"; + private static final int DEFAULT_SIMULATE_MAX_LOG_WORDS = 5; + + @LogSearchPropertyDescription( + name = SIMULATE_MAX_LOG_WORDS_PROPERTY, + description = "The maximum number of words in a simulated log entry.", + examples = {"8"}, + defaultValue = DEFAULT_SIMULATE_MAX_LOG_WORDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static int getSimulateMaxLogWords() { + return getIntProperty(SIMULATE_MAX_LOG_WORDS_PROPERTY, DEFAULT_SIMULATE_MAX_LOG_WORDS, 10, 20); + } + + private static final String SIMULATE_SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds"; + private static final int DEFAULT_SIMULATE_SLEEP_MILLISECONDS = 10000; + + @LogSearchPropertyDescription( + name = SIMULATE_SLEEP_MILLISECONDS_PROPERTY, + description = "The milliseconds to sleep between creating two simulated log entries.", + examples = {"5000"}, + defaultValue = DEFAULT_SIMULATE_SLEEP_MILLISECONDS + "", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static int getSimulateSleepMilliseconds() { + return getIntProperty(SIMULATE_SLEEP_MILLISECONDS_PROPERTY, DEFAULT_SIMULATE_SLEEP_MILLISECONDS); + } + + private static final String SIMULATE_LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids"; + + @LogSearchPropertyDescription( + name = SIMULATE_LOG_IDS_PROPERTY, + description = "The comma separated list of log ids for which to create the simulated log entries.", + examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"}, + defaultValue = "The log ids of the installed services in the cluster", + sources = {LOGFEEDER_PROPERTIES_FILE} + ) + public static String getSimulateLogIds() { + return getStringProperty(SIMULATE_LOG_IDS_PROPERTY); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java index a244a4e..9efab25 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java @@ -19,20 +19,14 @@ package org.apache.ambari.logfeeder.util; -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; import java.lang.reflect.Type; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; -import java.util.Properties; -import org.apache.ambari.logfeeder.LogFeeder; import org.apache.ambari.logfeeder.metrics.MetricData; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -47,26 +41,6 @@ import com.google.gson.reflect.TypeToken; public class LogFeederUtil { private static final Logger LOG = Logger.getLogger(LogFeederUtil.class); - public static final String LOGFEEDER_PROPERTIES_FILE = "logfeeder.properties"; - - @LogSearchPropertyDescription( - name = "cluster.name", - description = "The name of the cluster the Log Feeder program runs in.", - examples = {"cl1"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CLUSTER_NAME_PROPERTY = "cluster.name"; - - private static final String DEFAULT_TMP_DIR = "/tmp/$username/logfeeder/"; - @LogSearchPropertyDescription( - name = "logfeeder.tmp.dir", - description = "The tmp dir used for creating temporary files.", - examples = {"/tmp/"}, - defaultValue = DEFAULT_TMP_DIR, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String TMP_DIR_PROPERTY = "logfeeder.tmp.dir"; - private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create(); @@ -96,113 +70,6 @@ public class LogFeederUtil { LOG.error("Error getting hostname.", e); } } - - private static Properties props; - public static Properties getProperties() { - return props; - } - - public static void loadProperties() throws Exception { - loadProperties(LOGFEEDER_PROPERTIES_FILE); - } - - /** - * This method will read the properties from System, followed by propFile and finally from the map - */ - public static void loadProperties(String propFile) throws Exception { - LOG.info("Loading properties. propFile=" + propFile); - props = new Properties(System.getProperties()); - boolean propLoaded = false; - - // First get properties file path from environment value - String propertiesFilePath = System.getProperty("properties"); - if (StringUtils.isNotEmpty(propertiesFilePath)) { - File propertiesFile = new File(propertiesFilePath); - if (propertiesFile.exists() && propertiesFile.isFile()) { - LOG.info("Properties file path set in environment. Loading properties file=" + propertiesFilePath); - try (FileInputStream fis = new FileInputStream(propertiesFile)) { - props.load(fis); - propLoaded = true; - } catch (Throwable t) { - LOG.error("Error loading properties file. properties file=" + propertiesFile.getAbsolutePath()); - } - } else { - LOG.error("Properties file path set in environment, but file not found. properties file=" + propertiesFilePath); - } - } - - if (!propLoaded) { - try (BufferedInputStream bis = (BufferedInputStream) LogFeeder.class.getClassLoader().getResourceAsStream(propFile)) { - // Properties not yet loaded, let's try from class loader - if (bis != null) { - LOG.info("Loading properties file " + propFile + " from classpath"); - props.load(bis); - propLoaded = true; - } else { - LOG.fatal("Properties file not found in classpath. properties file name= " + propFile); - } - } - } - - if (!propLoaded) { - LOG.fatal("Properties file is not loaded."); - throw new Exception("Properties not loaded"); - } - } - - public static String getStringProperty(String key) { - return props == null ? null : props.getProperty(key); - } - - public static String getStringProperty(String key, String defaultValue) { - return props == null ? defaultValue : props.getProperty(key, defaultValue); - } - - public static boolean getBooleanProperty(String key, boolean defaultValue) { - String value = getStringProperty(key); - return toBoolean(value, defaultValue); - } - - private static boolean toBoolean(String value, boolean defaultValue) { - if (StringUtils.isEmpty(value)) { - return defaultValue; - } - - return "true".equalsIgnoreCase(value) || "yes".equalsIgnoreCase(value); - } - - public static int getIntProperty(String key, int defaultValue) { - return getIntProperty(key, defaultValue, null, null); - } - - public static int getIntProperty(String key, int defaultValue, Integer minValue, Integer maxValue) { - String value = getStringProperty(key); - int retValue = objectToInt(value, defaultValue, ", key=" + key); - if (minValue != null && retValue < minValue) { - LOG.info("Minimum rule was applied for " + key + ": " + retValue + " < " + minValue); - retValue = minValue; - } - if (maxValue != null && retValue > maxValue) { - LOG.info("Maximum rule was applied for " + key + ": " + retValue + " > " + maxValue); - retValue = maxValue; - } - return retValue; - } - - public static int objectToInt(Object objValue, int retValue, String errMessage) { - if (objValue == null) { - return retValue; - } - String strValue = objValue.toString(); - if (StringUtils.isNotEmpty(strValue)) { - try { - retValue = Integer.parseInt(strValue); - } catch (Throwable t) { - LOG.error("Error parsing integer value. str=" + strValue + ", " + errMessage); - } - } - return retValue; - } public static void logStatForMetric(MetricData metric, String prefixStr, String postFix) { long currStat = metric.value; @@ -232,6 +99,21 @@ public class LogFeederUtil { return gson.fromJson(jsonStr, type); } + public static int objectToInt(Object objValue, int retValue, String errMessage) { + if (objValue == null) { + return retValue; + } + String strValue = objValue.toString(); + if (StringUtils.isNotEmpty(strValue)) { + try { + retValue = Integer.parseInt(strValue); + } catch (Throwable t) { + LOG.error("Error parsing integer value. str=" + strValue + ", " + errMessage); + } + } + return retValue; + } + private static class LogHistory { private long lastLogTime = 0; private int counter = 0; @@ -260,21 +142,4 @@ public class LogFeederUtil { return false; } } - - public static String getClusterName() { - return getStringProperty(CLUSTER_NAME_PROPERTY); - } - - private static String logFeederTempDir = null; - - public synchronized static String getLogFeederTempDir() { - if (logFeederTempDir == null) { - String tempDirValue = getStringProperty(TMP_DIR_PROPERTY, DEFAULT_TMP_DIR); - HashMap<String, String> contextParam = new HashMap<String, String>(); - String username = System.getProperty("user.name"); - contextParam.put("username", username); - logFeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, contextParam); - } - return logFeederTempDir; - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java index 573bb1c..6bcaac9 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SSLUtil.java @@ -19,7 +19,6 @@ package org.apache.ambari.logfeeder.util; -import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.ArrayUtils; @@ -29,8 +28,6 @@ import org.apache.log4j.Logger; import java.io.File; import java.nio.charset.Charset; -import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; - public class SSLUtil { private static final Logger LOG = Logger.getLogger(SSLUtil.class); @@ -45,13 +42,6 @@ public class SSLUtil { private static final String KEYSTORE_PASSWORD_FILE = "ks_pass.txt"; private static final String TRUSTSTORE_PASSWORD_FILE = "ts_pass.txt"; - @LogSearchPropertyDescription( - name = "hadoop.security.credential.provider.path", - description = "The jceks file that provides passwords.", - examples = {"jceks://file/etc/ambari-logsearch-logfeeder/conf/logfeeder.jceks"}, - sources = {LOGFEEDER_PROPERTIES_FILE} - ) - private static final String CREDENTIAL_STORE_PROVIDER_PATH = "hadoop.security.credential.provider.path"; private static final String LOGFEEDER_CERT_DEFAULT_FOLDER = "/etc/ambari-logsearch-portal/conf/keys"; private static final String LOGFEEDER_STORE_DEFAULT_PASSWORD = "bigdata"; @@ -111,13 +101,13 @@ public class SSLUtil { private static String getPasswordFromCredentialStore(String propertyName) { try { - String providerPath = LogFeederUtil.getStringProperty(CREDENTIAL_STORE_PROVIDER_PATH); + String providerPath = LogFeederPropertiesUtil.getCredentialStoreProviderPath(); if (providerPath == null) { return null; } Configuration config = new Configuration(); - config.set(CREDENTIAL_STORE_PROVIDER_PATH, providerPath); + config.set(LogFeederPropertiesUtil.CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, providerPath); char[] passwordChars = config.getPassword(propertyName); return (ArrayUtils.isNotEmpty(passwordChars)) ? new String(passwordChars) : null; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java index c07035b..46abc63 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java @@ -29,7 +29,7 @@ import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.commons.lang.time.DateUtils; @@ -61,7 +61,7 @@ public class LogConfigHandlerTest { @BeforeClass public static void init() throws Exception { - LogFeederUtil.loadProperties("logfeeder.properties"); + LogFeederPropertiesUtil.loadProperties("logfeeder.properties"); LogSearchConfig config = strictMock(LogSearchConfig.class); config.createLogLevelFilter(anyString(), anyString(), anyObject(LogLevelFilter.class)); http://git-wip-us.apache.org/repos/asf/ambari/blob/27386c3d/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java index 4b4067c..24042a7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java @@ -29,7 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.TreeMap; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.junit.Before; @@ -44,7 +44,7 @@ public class MetricsManagerTest { @BeforeClass public static void loadProperties() throws Exception { - LogFeederUtil.loadProperties("logfeeder.properties"); + LogFeederPropertiesUtil.loadProperties("logfeeder.properties"); } @Before