This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch cloudbreak in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
commit 54b055fde96bb3124ef2667dcef38a07a05094cf Author: Olivér Szabó <oleew...@gmail.com> AuthorDate: Mon Oct 8 12:58:26 2018 +0200 AMBARI-24743. Cleanup logsearch: add/cleanup javadoc. (#4) * AMBARI-24743. Cleanup logsearch: add/cleanup javadoc. * AMBARI-24743. Add some more Javadocs (mainly for Log Feeder) * AMBARI-24743. Fixxes based on review --- Makefile | 3 + .../logsearch/config/api/InputConfigMonitor.java | 2 +- .../config/api/LogLevelFilterManager.java | 11 ++- .../config/api/LogLevelFilterMonitor.java | 4 + .../config/api/LogLevelFilterUpdater.java | 1 + .../logsearch/config/api/LogSearchConfig.java | 4 +- .../config/api/LogSearchConfigLogFeeder.java | 6 +- .../config/api/LogSearchConfigServer.java | 6 +- .../config/api/LogSearchPropertyDescription.java | 10 +-- .../api/ShipperConfigElementDescription.java | 10 +-- .../config/api/ShipperConfigTypeDescription.java | 4 +- ambari-logsearch-config-json/pom.xml | 2 +- .../config/zookeeper/LogLevelFilterManagerZK.java | 3 + .../zookeeper/LogSearchConfigLogFeederZK.java | 3 + .../config/zookeeper/LogSearchConfigServerZK.java | 3 + .../config/zookeeper/LogSearchConfigZK.java | 3 + .../config/zookeeper/LogSearchConfigZKHelper.java | 23 +++++- .../container/docker/command/ContainerCommand.java | 2 +- .../ambari/logfeeder/plugin/common/AliasUtil.java | 5 +- .../ambari/logfeeder/plugin/common/ConfigItem.java | 22 ++--- .../plugin/common/LogFeederProperties.java | 3 +- .../ambari/logfeeder/plugin/common/MetricData.java | 3 + .../ambari/logfeeder/plugin/filter/Filter.java | 50 +++++++----- .../logfeeder/plugin/filter/mapper/Mapper.java | 45 +++++----- .../ambari/logfeeder/plugin/input/Input.java | 95 ++++++++++++++++------ .../ambari/logfeeder/plugin/input/InputMarker.java | 13 +++ .../logfeeder/plugin/input/cache/LRUCache.java | 11 +++ .../logfeeder/plugin/manager/BlockManager.java | 17 ++++ .../plugin/manager/CheckpointManager.java | 40 ++++++++- .../logfeeder/plugin/manager/InputManager.java | 38 ++++++++- .../logfeeder/plugin/manager/OutputManager.java | 33 ++++++-- .../ambari/logfeeder/plugin/output/Output.java | 61 +++++++++++--- .../ambari/logfeeder/common/ConfigHandler.java | 3 + .../logfeeder/common/LogEntryParseTester.java | 13 ++- .../common/LogFeederSolrClientFactory.java | 11 +++ .../ambari/logfeeder/filter/DockerLogFilter.java | 8 ++ .../apache/ambari/logfeeder/filter/FilterGrok.java | 30 ++++++- .../apache/ambari/logfeeder/filter/FilterJSON.java | 19 +++++ .../ambari/logfeeder/filter/FilterKeyValue.java | 37 ++++++++- .../logfeeder/input/InputConfigUploader.java | 3 + .../apache/ambari/logfeeder/input/InputFile.java | 38 +++++++-- .../ambari/logfeeder/input/InputManagerImpl.java | 43 ++-------- .../apache/ambari/logfeeder/input/InputS3File.java | 14 +++- .../ambari/logfeeder/input/InputSimulate.java | 3 + .../apache/ambari/logfeeder/input/InputSocket.java | 3 + .../logfeeder/input/file/ProcessFileHelper.java | 10 +++ .../file/checkpoint/FileCheckpointManager.java | 3 + .../file/checkpoint/util/CheckpointFileReader.java | 15 ++++ .../file/checkpoint/util/FileCheckInHelper.java | 8 ++ .../util/FileCheckpointCleanupHelper.java | 8 ++ .../checkpoint/util/ResumeLineNumberHelper.java | 9 ++ .../input/monitor/AbstractLogFileMonitor.java | 3 + .../input/monitor/CheckpointCleanupMonitor.java | 3 + .../input/monitor/DockerLogFileUpdateMonitor.java | 17 ++-- .../ambari/logfeeder/input/reader/GZIPReader.java | 3 + .../input/reader/LogsearchReaderFactory.java | 9 ++ .../loglevelfilter/LogLevelFilterHandler.java | 3 + .../ambari/logfeeder/mapper/MapperAnonymize.java | 18 +++- .../apache/ambari/logfeeder/mapper/MapperDate.java | 15 +++- .../ambari/logfeeder/mapper/MapperFieldCopy.java | 14 +++- .../ambari/logfeeder/mapper/MapperFieldName.java | 14 +++- .../ambari/logfeeder/mapper/MapperFieldValue.java | 13 ++- .../apache/ambari/logfeeder/output/OutputFile.java | 2 +- .../ambari/logfeeder/output/OutputHDFSFile.java | 2 +- .../ambari/logfeeder/output/OutputLineFilter.java | 7 +- .../ambari/logfeeder/output/OutputManagerImpl.java | 33 +++----- .../ambari/logfeeder/output/OutputS3File.java | 3 +- .../apache/ambari/logfeeder/output/OutputSolr.java | 38 +++++++-- .../org/apache/ambari/logfeeder/util/S3Util.java | 6 ++ .../logfeeder/mapper/MapperAnonymizeTest.java | 6 +- .../ambari/logfeeder/mapper/MapperDateTest.java | 12 +-- .../logfeeder/mapper/MapperFieldCopyTest.java | 4 +- .../logfeeder/mapper/MapperFieldNameTest.java | 4 +- .../logfeeder/mapper/MapperFieldValueTest.java | 6 +- .../logsearch/common/ExternalServerClient.java | 21 ++--- .../common/LogSearchLdapAuthorityMapper.java | 7 +- .../org/apache/ambari/logsearch/dao/RoleDao.java | 1 + .../handler/AbstractSolrConfigHandler.java | 10 ++- .../org/apache/ambari/logsearch/util/JSONUtil.java | 17 ++-- .../org/apache/ambari/logsearch/util/SolrUtil.java | 4 +- .../logsearch/web/filters/LogsearchKrbFilter.java | 4 - .../web/filters/LogsearchTrustedProxyFilter.java | 14 ++-- .../LogsearchAbstractAuthenticationProvider.java | 1 + 83 files changed, 841 insertions(+), 287 deletions(-) diff --git a/Makefile b/Makefile index 6eb4b22..16d3782 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,9 @@ rpm-jdk8: deb-jdk8: $(MAVEN_BINARY) clean package -Dbuild-deb -DskipTests -Djdk.version=1.8 +javadoc: + $(MAVEN_BINARY) javadoc:javadoc + docker-build: $(MAVEN_BINARY) clean package docker:build -DskipTests -Dlogsearch.docker.tag=$(LOGSEARCH_BUILD_DOCKER_TAG) diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java index 746c14c..baf6c9c 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java @@ -37,7 +37,7 @@ public interface InputConfigMonitor { * * @param serviceName The name of the service for which the input configuration was created. * @param inputConfig The input configuration. - * @throws Exception + * @throws Exception error during loading the input configurations */ void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception; diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterManager.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterManager.java index 00df7bf..d75fbf3 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterManager.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterManager.java @@ -21,24 +21,27 @@ package org.apache.ambari.logsearch.config.api; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +/** + * Manage log level filter operations + */ public interface LogLevelFilterManager { /** * Uploads the log level filter of a log. * - * @param clusterName The name of the cluster where the log is. + * @param clusterName The name of the cluster where the logs are located. * @param logId The id of the log. * @param filter The log level filter for the log. - * @throws Exception + * @throws Exception error during creating the log level filter */ void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception; /** * Modifies the log level filters for all the logs. * - * @param clusterName The name of the cluster where the logs are. + * @param clusterName The name of the cluster where the logs are located. * @param filters The log level filters to set. - * @throws Exception + * @throws Exception error during setting the log level filter */ void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception; diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java index 841f09e..1726b3c 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java @@ -26,6 +26,9 @@ import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilte import java.util.Map; +/** + * Implement it to make notifications against log level filter operations. + */ public interface LogLevelFilterMonitor { /** * Notification of a new or updated log level filter. @@ -44,6 +47,7 @@ public interface LogLevelFilterMonitor { /** * Helper function to get all log level filters + * @return get log level filters per log types */ Map<String, LogLevelFilter> getLogLevelFilters(); diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterUpdater.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterUpdater.java index 83cbe81..ddb74e9 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterUpdater.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterUpdater.java @@ -61,6 +61,7 @@ public abstract class LogLevelFilterUpdater extends Thread { /** * Periodically check filters from a source (and use log level filter monitor to create/update/delete it) + * @param logLevelFilterMonitor log level filter monitor (as input) which can be used to change state */ protected abstract void checkFilters(final LogLevelFilterMonitor logLevelFilterMonitor); } diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java index 97eabdf..25071d8 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java @@ -33,17 +33,19 @@ public interface LogSearchConfig extends Closeable { * @param clusterName The name of the cluster where the service is. * @param serviceName The name of the service of which's input configuration is uploaded. * @param inputConfig The input configuration of the service. - * @throws Exception + * @throws Exception error during input creation */ void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; /** * Get log level filter handler / manager + * @return object that manages log level filter */ LogLevelFilterManager getLogLevelFilterManager(); /** * Set log level filter handler / manager + * @param logLevelFilterManager log level filter manager object */ void setLogLevelFilterManager(LogLevelFilterManager logLevelFilterManager); } diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java index b3bab77..e0251d3 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java @@ -30,7 +30,7 @@ public interface LogSearchConfigLogFeeder extends LogSearchConfig { * * @param properties The properties of that component. * @param clusterName The name of the cluster. - * @throws Exception + * @throws Exception error during Log Feeder config initialization */ void init(Map<String, String> properties, String clusterName) throws Exception; @@ -39,7 +39,7 @@ public interface LogSearchConfigLogFeeder extends LogSearchConfig { * * @param serviceName The name of the service looked for. * @return If input configuration exists for the service. - * @throws Exception + * @throws Exception error during checking Log Feeder config */ boolean inputConfigExists(String serviceName) throws Exception; @@ -49,7 +49,7 @@ public interface LogSearchConfigLogFeeder extends LogSearchConfig { * @param inputConfigMonitor The input config monitor to call in case of an input config change. * @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change. * @param clusterName The name of the cluster. - * @throws Exception + * @throws Exception error during input configuration change */ void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, String clusterName) throws Exception; diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java index d269c5a..1727ceb 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java @@ -33,7 +33,7 @@ public interface LogSearchConfigServer extends LogSearchConfig { * Initialization of the configuration. * * @param properties The properties of that component. - * @throws Exception + * @throws Exception error during Log Search configuration initialization */ void init(Map<String, String> properties) throws Exception; @@ -51,7 +51,7 @@ public interface LogSearchConfigServer extends LogSearchConfig { * @param clusterName The name of the cluster where the service is looked for. * @param serviceName The name of the service looked for. * @return If input configuration exists for the service. - * @throws Exception + * @throws Exception error during input configuration check */ boolean inputConfigExists(String clusterName, String serviceName) throws Exception; @@ -69,7 +69,7 @@ public interface LogSearchConfigServer extends LogSearchConfig { * @param clusterName The name of the cluster where the service is. * @param serviceName The name of the service of which's input configuration is uploaded. * @param inputConfig The input configuration of the service. - * @throws Exception + * @throws Exception error during setting input configuration */ void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception; diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchPropertyDescription.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchPropertyDescription.java index 330ef5c..e4da48f 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchPropertyDescription.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchPropertyDescription.java @@ -32,27 +32,27 @@ import java.lang.annotation.Target; public @interface LogSearchPropertyDescription { /** - * Name of the property inside the application level property file. + * @return Name of the property inside the application level property file. */ String name(); /** - * Describe what the property used for. + * @return Describe what the property used for. */ String description(); /** - * An example value for the property. + * @return Example values for the property. */ String[] examples(); /** - * Default value of the property, emtpy by default. + * @return Default value of the property, emtpy by default. */ String defaultValue() default ""; /** - * Name of the property files where the configurations located + * @return Name of the property files where the configurations located */ String[] sources(); diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigElementDescription.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigElementDescription.java index d65bf8e..577dab9 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigElementDescription.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigElementDescription.java @@ -32,27 +32,27 @@ import java.lang.annotation.Target; public @interface ShipperConfigElementDescription { /** - * The path of the json element. + * @return The path of the json element. */ String path(); /** - * The type of the json element. + * @return The type of the json element. */ String type(); /** - * Describe what the json element is used for. + * @return Describe what the json element is used for. */ String description(); /** - * An example value for the element, if applicable. + * @return Example values for the element, if applicable. */ String[] examples() default {}; /** - * Default value of the json element, if applicable. + * @return Default value of the json element, if applicable. */ String defaultValue() default ""; diff --git a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigTypeDescription.java b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigTypeDescription.java index 1c112d8..96b79bb 100644 --- a/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigTypeDescription.java +++ b/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/ShipperConfigTypeDescription.java @@ -32,12 +32,12 @@ import java.lang.annotation.Target; public @interface ShipperConfigTypeDescription { /** - * The name of the element type. + * @return The name of the element type. */ String name(); /** - * The description of the json element. + * @return The description of the json element. */ String description(); diff --git a/ambari-logsearch-config-json/pom.xml b/ambari-logsearch-config-json/pom.xml index 9b33fe0..05565bd 100644 --- a/ambari-logsearch-config-json/pom.xml +++ b/ambari-logsearch-config-json/pom.xml @@ -43,7 +43,7 @@ <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> - <version>3.4</version> + <version>3.6</version> </dependency> <dependency> <groupId>commons-collections</groupId> diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java index 81c3f23..fd08e07 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java @@ -34,6 +34,9 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +/** + * Managing log level filters in ZooKeeper (store them in ZNodes per cluster). Operations: create / get / update log level filters. + */ public class LogLevelFilterManagerZK implements LogLevelFilterManager { private static final Logger logger = LogManager.getLogger(LogLevelFilterManagerZK.class); diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java index 6fc2ea0..9c8870f 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java @@ -44,6 +44,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.zookeeper.KeeperException; +/** + * ZooKeeper related shipper configuration manager/listener for Log Feeder. On input changes, operations are passed to a monitor interface. + */ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements LogSearchConfigLogFeeder { private static final Logger logger = LogManager.getLogger(LogSearchConfigLogFeederZK.class); diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java index 5bcdefc..090f215 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java @@ -37,6 +37,9 @@ import com.google.gson.JsonParser; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * ZooKeeper related shipper configuration operations for Log Search Server. + */ public class LogSearchConfigServerZK extends LogSearchConfigZK implements LogSearchConfigServer { private static final Logger logger = LogManager.getLogger(LogSearchConfigServerZK.class); diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index d29da94..556c033 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -30,6 +30,9 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; import com.google.gson.Gson; +/** + * Common ZooKeeper related shipper configuration operations that can be used by both Log Feeder and Log Search server. + */ public class LogSearchConfigZK implements LogSearchConfig { private static final Logger logger = LogManager.getLogger(LogSearchConfigZK.class); diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java index de6db9a..638e7b6 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java @@ -116,6 +116,8 @@ public class LogSearchConfigZKHelper { /** * Create ZK curator client from a configuration (map holds the configs for that) + * @param properties key/value pairs that holds configurations for the zookeeper client + * @return zookeeper client */ public static CuratorFramework createZKClient(Map<String, String> properties) { String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT); @@ -130,6 +132,8 @@ public class LogSearchConfigZKHelper { /** * Get ACLs from a property (get the value then parse and transform it as ACL objects) + * @param properties key/value pairs that needs to be parsed as ACLs + * @return list of ACLs */ public static List<ACL> getAcls(Map<String, String> properties) { String aclStr = properties.get(ZK_ACLS_PROPERTY); @@ -165,6 +169,10 @@ public class LogSearchConfigZKHelper { /** * Create listener for znode of log level filters - can be used for Log Feeder as it can be useful if it's monitoring the log level changes + * @param clusterName name of the cluster + * @param gson object to be used for json serialization + * @param logLevelFilterMonitor log level filter monitor object that can be used to do something during znode chagne + * @return listener response */ public static TreeCacheListener createTreeCacheListener(String clusterName, Gson gson, LogLevelFilterMonitor logLevelFilterMonitor) { return new TreeCacheListener() { @@ -188,6 +196,9 @@ public class LogSearchConfigZKHelper { /** * Create root + cluster name znode cache + * @param client zookeeper client + * @param clusterName name of the cluster + * @return znode cache */ public static TreeCache createClusterCache(CuratorFramework client, String clusterName) { return new TreeCache(client, String.format("/%s", clusterName)); @@ -195,6 +206,9 @@ public class LogSearchConfigZKHelper { /** * Assign listener to cluster cache and start to use that listener + * @param clusterCache zookeeper znode cache (cluster) + * @param listener znode cache listener - trigger on events + * @throws Exception error during assinging the listener to the cache */ public static void addAndStartListenersOnCluster(TreeCache clusterCache, TreeCacheListener listener) throws Exception { clusterCache.getListenable().addListener(listener); @@ -210,8 +224,13 @@ public class LogSearchConfigZKHelper { /** * Call log level filter monitor interface to handle node related operations (on update/remove) + * @param eventType zookeeper event type (add/update/remove) + * @param nodeName name of the znode + * @param nodeData znode data + * @param gson object that can serialize inputs + * @param logLevelFilterMonitor monitor object that can pass business logic that should happen during znode events */ - public static void handleLogLevelFilterChange(final TreeCacheEvent.Type eventType, final String nodeName, final String nodeData, + static void handleLogLevelFilterChange(final TreeCacheEvent.Type eventType, final String nodeName, final String nodeData, final Gson gson, final LogLevelFilterMonitor logLevelFilterMonitor) { switch (eventType) { case NODE_ADDED: @@ -231,6 +250,8 @@ public class LogSearchConfigZKHelper { /** * Pares ZK ACL permission string and transform it to an integer + * @param permission string input (permission) that will be transformed to an integer + * @return Integer code of a zookeeper ACL */ public static Integer parsePermission(String permission) { int permissionCode = 0; diff --git a/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/container/docker/command/ContainerCommand.java b/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/container/docker/command/ContainerCommand.java index 92c24ee..98ade0e 100644 --- a/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/container/docker/command/ContainerCommand.java +++ b/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/container/docker/command/ContainerCommand.java @@ -22,7 +22,7 @@ import java.util.Map; /** * Responsible of execute container commands. (like listing or inspecting containers) - * @param <RESPONSE_TYPE> + * @param <RESPONSE_TYPE> object that should be returned by a container command */ public interface ContainerCommand<RESPONSE_TYPE> { diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java index fc93dab..3b879d7 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java @@ -31,6 +31,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; +/** + * Helper class to map input/filter/outputs java classes to names, which can be used in input configurations. + */ public class AliasUtil { private static final Logger logger = LogManager.getLogger(AliasUtil.class); @@ -116,7 +119,7 @@ public class AliasUtil { return aliasInfo; } - public static HashMap<String, Object> getJsonFileContentFromClassPath(String fileName) { + private static HashMap<String, Object> getJsonFileContentFromClassPath(String fileName) { ObjectMapper mapper = new ObjectMapper(); try (InputStream inputStream = AliasUtil.class.getClassLoader().getResourceAsStream(fileName)) { return mapper.readValue(inputStream, new TypeReference<HashMap<String, Object>>() {}); diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java index 76fcf3f..27bb04f 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java @@ -30,6 +30,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * This class is used to gather json configs for Log Feeder shipper configurations. Get specific properties with specific types (key/values pairs) + * @param <PROP_TYPE> Log Feeder configuration holder object + */ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implements Cloneable, Serializable { private static final Logger logger = LogManager.getLogger(ConfigItem.class); @@ -49,7 +53,8 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen public abstract void init(PROP_TYPE logFeederProperties) throws Exception; /** - * Used while logging. Keep it short and meaningful + * Get description of config item (input / output / filter) + * @return String value used while logging. Keep it short and meaningful */ public abstract String getShortDescription(); @@ -59,11 +64,6 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen public void loadConfig(Map<String, Object> map) { configs = cloneObject(map); - - Map<String, String> nvList = getNVList("add_fields"); - if (nvList != null) { - contextFields.putAll(nvList); - } } @SuppressWarnings("unchecked") @@ -91,7 +91,7 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen logStatForMetric(statMetric, "Stat"); } - public void logStatForMetric(MetricData metric, String prefixStr) { + protected void logStatForMetric(MetricData metric, String prefixStr) { long currStat = metric.value; long currMS = System.currentTimeMillis(); String postFix = ", key=" + getShortDescription(); @@ -194,12 +194,4 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen return gson.fromJson(jsonStr, type); } - private Object getValue(String property) { - return configs.get(property); - } - - private Object getValue(String property, Object defaultValue) { - return configs.getOrDefault(property, defaultValue); - } - } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java index 7fac01a..abdec7a 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java @@ -27,7 +27,8 @@ import java.util.Properties; public interface LogFeederProperties extends Serializable { /** - * Get all key-value pairs from static application level Log Feeder configuration + * Get static application level Log Feeder configuration + * @return Log Feeder configuration (key-value pairs) */ Properties getProperties(); } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java index 54cdb7e..b00e048 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java @@ -20,6 +20,9 @@ package org.apache.ambari.logfeeder.plugin.common; import java.io.Serializable; +/** + * Holds Log Feeder metrics data + */ public class MetricData implements Serializable { public final String metricsName; public final boolean isPointInTime; diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java index 8f0fa71..b0c02a6 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java @@ -37,40 +37,36 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * Represents the filter in Log Feeder shipper input configurations. + * At least 1 filter is required for a valid input config. + * Can transform inputs (adding/removing/create fields), those will be shipped to outputs or other filters (in chain) + * @param <PROP_TYPE> Log Feeder configuration holder object + */ public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends ConfigItem<PROP_TYPE> { private static final Logger logger = LogManager.getLogger(Filter.class); private final Map<String, List<Mapper>> postFieldValueMappers = new HashMap<>(); private FilterDescriptor filterDescriptor; - private PROP_TYPE logFeederProperties; private Filter nextFilter = null; private Input input; private OutputManager outputManager; - public void loadConfigs(FilterDescriptor filterDescriptor, PROP_TYPE logFeederProperties, OutputManager outputManager) { - this.filterDescriptor = filterDescriptor; - this.logFeederProperties = logFeederProperties; - this.outputManager = outputManager; - } - public FilterDescriptor getFilterDescriptor() { return filterDescriptor; } - public PROP_TYPE getLogFeederProperties() { - return logFeederProperties; - } - + @SuppressWarnings("unchecked") @Override public void init(PROP_TYPE logFeederProperties) throws Exception { - initializePostMapValues(); + initializePostMapValues(logFeederProperties); if (nextFilter != null) { nextFilter.init(logFeederProperties); } } - private void initializePostMapValues() { + private void initializePostMapValues(PROP_TYPE logFeederProperties) { Map<String, ? extends List<? extends PostMapValues>> postMapValues = filterDescriptor.getPostMapValues(); if (postMapValues == null) { return; @@ -85,7 +81,7 @@ public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends Conf logger.warn("Unknown mapper type: " + mapClassCode); continue; } - if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) { + if (mapper.init(logFeederProperties, getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) { List<Mapper> fieldMapList = postFieldValueMappers.computeIfAbsent(fieldName, k -> new ArrayList<>()); fieldMapList.add(mapper); } @@ -95,7 +91,10 @@ public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends Conf } /** - * Deriving classes should implement this at the minimum + * Apply a filter on an input (input can be an output of an another filter). Deriving classes should implement this at the minimum. + * @param inputStr Incoming input as a string + * @param inputMarker Marker which can identify a specific input (like line number + input details) + * @throws Exception Any error which happens during applying the filter */ public void apply(String inputStr, InputMarker inputMarker) throws Exception { // TODO: There is no transformation for string types. @@ -106,6 +105,12 @@ public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends Conf } } + /** + * Apply a filter on an input (input can be an output of an another filter). + * @param jsonObj Key/value pairs of incoming inputs - mostly fields and values + * @param inputMarker Marker which can identify a specific input (like line number + input details) + * @throws Exception Any error which happens during applying the filter + */ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception { for (String fieldName : postFieldValueMappers.keySet()) { Object value = jsonObj.get(fieldName); @@ -122,6 +127,10 @@ public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends Conf } } + /** + * Set filter descriptor shipper configuration object for the filter + * @param filterDescriptor Filter descriptor, stores filter configurations + */ public void loadConfig(FilterDescriptor filterDescriptor) { this.filterDescriptor = filterDescriptor; } @@ -142,18 +151,21 @@ public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends Conf this.input = input; } - public OutputManager getOutputManager() { - return outputManager; - } - public void setOutputManager(OutputManager outputManager) { this.outputManager = outputManager; } + /** + * Call flush on a filter - implement only if any kind of flush is required for the resources of a filter, which is different from the close operation. + */ public void flush() { // empty } + /** + * Implement this for specific filter if it is required to close resources properly. By default it tries to close the next chained filter. + * (you can keep this behaviour if you are using with super.close() ) + */ public void close() { if (nextFilter != null) { nextFilter.close(); diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java index d52bc01..6baf4c4 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java @@ -23,11 +23,12 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescript import java.util.Map; +/** + * Mapper is used do mapping on specific fields which can be generated / gathered by Log Feeder filters + * @param <PROP_TYPE> Log Feeder configuration holder object + */ public abstract class Mapper<PROP_TYPE extends LogFeederProperties> { - private MapFieldDescriptor mapFieldDescriptor; - private PROP_TYPE logFeederProperties; - private String inputDesc; private String fieldName; private String mapClassCode; @@ -38,35 +39,29 @@ public abstract class Mapper<PROP_TYPE extends LogFeederProperties> { this.mapClassCode = mapClassCode; } - public void loadConfigs(MapFieldDescriptor mapFieldDescriptor, PROP_TYPE logFeederProperties) { - this.mapFieldDescriptor = mapFieldDescriptor; - this.logFeederProperties = logFeederProperties; - } - - public MapFieldDescriptor getMapFieldDescriptor() { - return mapFieldDescriptor; - } - - public PROP_TYPE getLogFeederProperties() { - return logFeederProperties; - } - - public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor); + /** + * Initialize the mapper + * @param logFeederProperties holds global logfeeder properties + * @param inputDesc input description + * @param fieldName field name + * @param mapClassCode mapper type - to identify a mapper + * @param mapFieldDescriptor mapper field descriptor + * @return true if initialization is successful + */ + public abstract boolean init(PROP_TYPE logFeederProperties, String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor); + /** + * Apply mapper using fields (key / value pairs) + * @param jsonObj key/value pairs - holds fields an their values + * @param value object that is applied on the field + * @return result after the apply + */ public abstract Object apply(Map<String, Object> jsonObj, Object value); - public String getInputDesc() { - return inputDesc; - } - public String getFieldName() { return fieldName; } - public String getMapClassCode() { - return mapClassCode; - } - @Override public String toString() { return "mapClass=" + mapClassCode + ", input=" + inputDesc + ", fieldName=" + fieldName; diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java index 421ca86..6228637 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java @@ -38,12 +38,25 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +/** + * Represents an input which can be monitored and sends through filters to outputs. + * Usage flow: + * <pre> + * 1. Checks input is ready or not + * 2. Call monitor if input is ready + * 3. Monitor method can start threads from the input + * 4. The thread run command can use start() + * 5. Call close if thread is interrupted or finished or set to be drained + * </pre> + * @param <PROP_TYPE> Log Feeder configuration holder object + * @param <INPUT_MARKER> Type of the input marker - can be anything which can store unique data about an input + * @param <INPUT_DESC_TYPE> Descriptor type from the shipper configuration - use this to access input details. + */ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker, INPUT_DESC_TYPE extends InputDescriptor> extends ConfigItem<PROP_TYPE> implements Runnable { private static final Logger logger = LogManager.getLogger(Input.class); private INPUT_DESC_TYPE inputDescriptor; - private PROP_TYPE logFeederProperties; private LogSearchConfigLogFeeder logSearchConfig; private InputManager inputManager; private OutputManager outputManager; @@ -58,41 +71,53 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER private LRUCache cache; private String cacheKeyField; private boolean initDefaultFields; - protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false); - - public void loadConfigs(INPUT_DESC_TYPE inputDescriptor, PROP_TYPE logFeederProperties, - InputManager inputManager, OutputManager outputManager) { - this.inputDescriptor = inputDescriptor; - this.logFeederProperties = logFeederProperties; - this.inputManager = inputManager; - this.outputManager = outputManager; - } - - public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) { - this.logSearchConfig = logSearchConfig; - } - - public LogSearchConfigLogFeeder getLogSearchConfig() { - return logSearchConfig; - } + private MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false); + /** + * Start monitor an input, it should depend on the input is ready or not, if it is ready and can be monitored it will return true. + * That method should create new threads for the input object and call start() method on it. (one input can be cloned, e.g.: if using wildcards for an input, it should start multiple threads) + * @return Flags that the input can be monitored or not. + */ public abstract boolean monitor(); public abstract INPUT_MARKER getInputMarker(); + /** + * Check the input is ready for monitoring or not + * @return input state + */ public abstract boolean isReady(); + /** + * Set the input state, if it set to true, input can be monitored. + * @param isReady input state + */ public abstract void setReady(boolean isReady); + /** + * Dump input data pointer e.g.: save line number for a file input - it can be used later to start monitoring from the right place after restart. + * @param inputMarker Type of the input marker - can be anything which can store unique data about an input + */ public abstract void checkIn(INPUT_MARKER inputMarker); + /** + * Call last check in during shutdown. + */ public abstract void lastCheckIn(); + /** + * Obtain read bytes metric name - if there are any metric sinks in the application it can identify the specific metric for the input + * @return metric name + */ public abstract String getReadBytesMetricName(); - public PROP_TYPE getLogFeederProperties() { - return logFeederProperties; - } + /** + * This method will be called from the thread spawned for the output. This + * method should only exit after all data are read from the source or the + * process is exiting + * @throws Exception Error during starting the specific input monitoring thread + */ + public abstract void start() throws Exception; public INPUT_DESC_TYPE getInputDescriptor() { return inputDescriptor; @@ -114,10 +139,15 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER this.inputManager = inputManager; } + /** + * Bound an output (destination) for the input + * @param output input destination + */ public void addOutput(Output output) { outputList.add(output); } + @SuppressWarnings("unchecked") public void addFilter(Filter filter) { if (firstFilter == null) { firstFilter = filter; @@ -176,12 +206,10 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER } /** - * This method will be called from the thread spawned for the output. This - * method should only exit after all data are read from the source or the - * process is exiting + * Process a small chunk of the input. (e.g.: process 1 line) It should send the data through filters before the output destination. + * @param line log text input to be processed + * @param marker input marker that stores input details */ - public abstract void start() throws Exception; - public void outputLine(String line, INPUT_MARKER marker) { statMetric.value++; readBytesMetric.value += (line.length()); @@ -198,6 +226,9 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER } } + /** + * Call close on input, it should flag filters to be closed as well + */ public void close() { logger.info("Close called. " + getShortDescription()); try { @@ -215,10 +246,22 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER } } + public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) { + this.logSearchConfig = logSearchConfig; + } + + public LogSearchConfigLogFeeder getLogSearchConfig() { + return logSearchConfig; + } + public void loadConfig(INPUT_DESC_TYPE inputDescriptor) { this.inputDescriptor = inputDescriptor; } + /** + * Set the input to be closed, if it is set to true, during input process, the monitoring thread should be finished. + * @param isClosed Flag input to be closed. + */ public void setClosed(boolean isClosed) { this.isClosed = isClosed; } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java index aa54019..078ff5e 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java @@ -20,10 +20,23 @@ package org.apache.ambari.logfeeder.plugin.input; import java.util.Map; +/** + * This interface stores unique data about an input. + * @param <INPUT_TYPE> Type of the input + */ public interface InputMarker <INPUT_TYPE extends Input> { + + /** + * Get the input for an input marker + * @return An Log Feeder input instance + */ INPUT_TYPE getInput(); + /** + * Get input marker properties + * @return marker properties which represents an input + */ Map<String, Object> getAllProperties(); } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java index 5e13811..2d69326 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java @@ -47,6 +47,17 @@ public class LRUCache implements Serializable { }; } + /** + * Check that an entry can be replaced with new values. + * <ol> + * <li> If key does not exist in the cache, entry can be replaced (it will be a new one) </li> + * <li> If "last" de-duplication is enabled, do not replace key as that is the most recent element </li> + * <li> If key exists and "last" de-duplication disabled, replace an entry only if the right interval passed between old and new values (as values are long timestamps) </li> + * </ol> + * @param key Key of the cache entry + * @param value Value of the cache entry + * @return Entry replaceable or not + */ public boolean isEntryReplaceable(String key, Long value) { boolean result = true; Long existingValue = keyValueMap.get(key); diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java index 674f51f..dcdeb58 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java @@ -22,14 +22,31 @@ import org.apache.ambari.logfeeder.plugin.common.MetricData; import java.util.List; +/** + * Stores common operations for input and output managers + */ public interface BlockManager { + /** + * Init input or output configuration block + * @throws Exception Error during initialization + */ void init() throws Exception; + /** + * Close input or output manager + */ void close(); + /** + * Log Statistics - needs to be implemented + */ void logStats(); + /** + * Adding a list of metrics to input or output manager, which can be processed (if implemented) + * @param metricsList List of metrics + */ void addMetricsContainers(List<MetricData> metricsList); } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java index abf1465..f34baf6 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/CheckpointManager.java @@ -24,19 +24,57 @@ import org.apache.ambari.logfeeder.plugin.input.InputMarker; import java.io.IOException; +/** + * Handle checkpoints for inputs, that can be used to store data about the inputs which can be used to process them proerly even if Log Feeder was restarted. + * @param <I> type of the input + * @param <IFM> input marker type - can store unique input data + * @param <P> object that holds global Log Feeder configurations + */ public interface CheckpointManager<I extends Input, IFM extends InputMarker, P extends LogFeederProperties> { + /** + * Init checkpoint manager. + * @param properties key/value pairs that can be used to configure checkpoint manager + */ void init(P properties); - void checkIn(I inputFile, IFM inputMarker); + /** + * Save an input pointer (e.g.: save line numbers in a file with some input identifiers) + * @param input input to be checked in by the checkpoint manager + * @param inputMarker input marker, can store unique input details + */ + void checkIn(I input, IFM inputMarker); + /** + * Resume input by checkpoints - get the line number + * @param input that should be resumed (processing) + * @return line number + */ int resumeLineNumber(I input); + /** + * Delete checkpoints by the checkpoint manager (e.g.: deleted dumped input details with line number data etc.) + */ void cleanupCheckpoints(); + /** + * Print checkpoint informations. + * @param checkpointLocation location of the checkpoint file + * @param logTypeFilter type of the input (input groups, like hdfs_namenode can be an input type) + * @param fileKeyFilter file key which can identify the input and checkpoint file + * @throws IOException error during printing a checkpoint + */ void printCheckpoints(String checkpointLocation, String logTypeFilter, String fileKeyFilter) throws IOException; + /** + * Clean a checkpoint by checkpoint manager. + * @param checkpointLocation location of the checkpoint file + * @param logTypeFilter type of the input (input groups, like hdfs_namenode can be an input type) + * @param fileKeyFilter file key which can identify the input and checkpoint file + * @param all flag to cleanup all checkpoints for a specific log type + * @throws IOException error during cleaning up a checkpoint + */ void cleanCheckpoint(String checkpointLocation, String logTypeFilter, String fileKeyFilter, boolean all) throws IOException; diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java index 6dc1423..ad7310d 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java @@ -20,25 +20,59 @@ package org.apache.ambari.logfeeder.plugin.manager; import org.apache.ambari.logfeeder.plugin.input.Input; -import java.io.File; import java.util.List; - +/** + * Holds input objects for Log Feeder and start monitoring them if those are in "ready" state. + */ public abstract class InputManager implements BlockManager { + /** + * Add a new input to not ready list (from that point, input manager will check inputs are ready or not, if an input is ready, start monitoring it) + * @param input input type + */ public abstract void addToNotReady(Input input); + /** + * Check in all inputs. (dump details for every inputs) + */ public abstract void checkInAll(); + /** + * Get all input objects (1 input can have more sub-thread inputs) + * @param serviceName input type + * @return list of inputs + */ public abstract List<Input> getInputList(String serviceName); + /** + * Add a new input object + * @param serviceName input type + * @param input input object + */ public abstract void add(String serviceName, Input input); + /** + * Remove an input + * @param input input object + */ public abstract void removeInput(Input input); + /** + * Remove an input identified by the input type + * @param serviceName input type + */ public abstract void removeInputsForService(String serviceName); + /** + * Check inputs are ready, if they are, start monitoring them. + * @param serviceName input type + */ public abstract void startInputs(String serviceName); + /** + * Get checkpoint handler which can be used to check in data for inputs during processing them. + * @return checkpoint manager + */ public abstract CheckpointManager getCheckpointHandler(); } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java index 3a3c601..7ab8825 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java @@ -20,24 +20,47 @@ package org.apache.ambari.logfeeder.plugin.manager; import org.apache.ambari.logfeeder.plugin.input.InputMarker; import org.apache.ambari.logfeeder.plugin.output.Output; -import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; import java.io.File; import java.util.List; import java.util.Map; +/** + * Holds output objects for Log Feeder and handle write operations for them based on the inputs/filters + */ public abstract class OutputManager implements BlockManager { - public abstract void write(Map<String, Object> jsonObj, InputMarker inputMarker); + /** + * Write map object (based on input/output descriptions) + * @param jsonObj json object (key/value pairs) that will be sent to an output destination + * @param marker holds unique input details + */ + public abstract void write(Map<String, Object> jsonObj, InputMarker marker); - public abstract void write(String jsonBlock, InputMarker inputMarker); + /** + * Write text (based on input/output descriptions) + * @param jsonBlock json string that will be sent to an output destination + * @param marker holds unique input details + */ + public abstract void write(String jsonBlock, InputMarker marker); + /** + * Copy an input file to a specific destination + * @param file object that holds a file + * @param marker holds unique input details + */ public abstract void copyFile(File file, InputMarker marker); + /** + * Add an output which will be hold by this class. + * @param output output object + */ public abstract void add(Output output); + /** + * Get all outputs + * @return output object list + */ public abstract List<Output> getOutputs(); - public abstract List<? extends OutputConfigMonitor> getOutputsToMonitor(); - } diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java index 52c5435..73caf68 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java @@ -34,6 +34,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +/** + * Output is responsible about to ship transformed inputs to a destination which should be implemented by extending this class. + * @param <PROP_TYPE> Log Feeder configuration holder object + * @param <INPUT_MARKER> Type of the input marker - can be anything which can store unique data about an input + */ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements OutputConfigMonitor { private static final Logger LOG = LogManager.getLogger(Output.class); @@ -46,23 +51,53 @@ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER private boolean isClosed; protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false); + /** + * Obtain the output type + * @return Text which represents the output type in shipper configuration. (e.g.: "solr") + */ public abstract String getOutputType(); + /** + * Copy input file, can be used instead or with processing an input + * @param inputFile File to copy + * @param inputMarker Marker that stores input details + * @throws Exception Error during input copy + */ public abstract void copyFile(File inputFile, InputMarker inputMarker) throws Exception; + /** + * Call write operation - should ship inputs to a destination + * @param jsonStr Input string to process (JSON string) + * @param inputMarker Marker that stores input details + * @throws Exception Error during output writing + */ public abstract void write(String jsonStr, INPUT_MARKER inputMarker) throws Exception; + /** + * Get pending output count + * @return Pending outputs (used during closing the outputs - mainly at shutdown phase) + */ public abstract Long getPendingCount(); + /** + * Obtain writes metric name - if there are any metric sinks in the application it can identify the specific metric for the output + * @return metric name + */ public abstract String getWriteBytesMetricName(); - public String getNameForThread() { - return this.getClass().getSimpleName(); + /** + * Converts key/value map to JSON string and call write() on that input + * @param jsonObj Key/value map which contains the fields to process + * @param inputMarker Marker that stores input details + * @throws Exception Error during output writing + */ + public void write(Map<String, Object> jsonObj, INPUT_MARKER inputMarker) throws Exception { + write(gson.toJson(jsonObj), inputMarker); } - public boolean monitorConfigChanges() { - return false; - }; + protected String getNameForThread() { + return this.getClass().getSimpleName(); + } public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) { this.logSearchConfig = logSearchConfig; @@ -82,6 +117,7 @@ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER /** * Get the list of fields that will be used for ID generation of log entries. + * @return list of string */ public List<String> getIdFields() { return new ArrayList<>(); @@ -91,12 +127,11 @@ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER return isClosed; } - public void setClosed(boolean closed) { - isClosed = closed; - } - - public void write(Map<String, Object> jsonObj, INPUT_MARKER inputMarker) throws Exception { - write(gson.toJson(jsonObj), inputMarker); + /** + * Flag an output to be closed + */ + protected void shouldCloseOutput() { + isClosed = true; } @Override @@ -117,12 +152,12 @@ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER return true; } - public void trimStrValue(Map<String, Object> jsonObj) { + protected void trimStrValue(Map<String, Object> jsonObj) { if (jsonObj != null) { for (Map.Entry<String, Object> entry : jsonObj.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); - if (value != null && value instanceof String) { + if (value instanceof String) { String valueStr = value.toString().trim(); jsonObj.put(key, valueStr); } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 67a5671..61f726c 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -63,6 +63,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +/** + * Initialize / close input and output managers and monitors input configuration changes. + */ public class ConfigHandler implements InputConfigMonitor { private static final Logger logger = LogManager.getLogger(ConfigHandler.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java index b000aed..b4a2a26 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java @@ -45,6 +45,9 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +/** + * Class for testing log parsing by test entries and service configs + */ public class LogEntryParseTester { private final String logEntry; @@ -78,6 +81,12 @@ public class LogEntryParseTester { } } + /** + * It tries to parse log line entry against the Log Feeder configurations (required inputs: log type (id), log entry, shipper config and a global config) + * @return result of the parsing - key / value pairs (fields and values) + * @throws Exception error that happens during log line parsing + */ + @SuppressWarnings("unchecked") public Map<String, Object> parse() throws Exception { InputConfig inputConfig = getInputConfig(); ConfigHandler configHandler = new ConfigHandler(null); @@ -99,7 +108,7 @@ public class LogEntryParseTester { input.getFirstFilter().init(logFeederProps); input.addOutput(new Output<LogFeederProps, InputFileMarker>() { @Override - public void init(LogFeederProps logFeederProperties) throws Exception { + public void init(LogFeederProps logFeederProperties) { } @Override @@ -134,7 +143,7 @@ public class LogEntryParseTester { @Override public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException { } - + @Override public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) { result.putAll(jsonObj); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java index 5018d48..dec3007 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java @@ -25,10 +25,21 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +/** + * Factory for creating specific Solr clients based on provided configurations (simple / LB or cloud Solr client) + */ public class LogFeederSolrClientFactory { private static final Logger logger = LogManager.getLogger(LogFeederSolrClientFactory.class); + /** + * Creates a new Solr client. If solr urls are provided create a LB client (Use simple Http client if only 1 provided), + * otherwise create a cloud client. That means at least providing zookeeper connection string or Solr urls are required. + * @param zkConnectionString zookeeper connection string, e.g.: localhost1:2181,localhost2:2181/solr + * @param solrUrls list of solr urls + * @param collection name of the Solr collection + * @return created client + */ public SolrClient createSolrClient(String zkConnectionString, String[] solrUrls, String collection) { logger.info("Creating solr client ..."); logger.info("Using collection=" + collection); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java index ab13775..485a4fc 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java @@ -22,11 +22,19 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil; import java.util.Map; +/** + * Helper class to get docker log line from a json based docker log + */ public class DockerLogFilter { private DockerLogFilter() { } + /** + * Convert json formatted docker log line to a simple log line. + * @param jsonInput docker log in json format + * @return log line + */ public static String getLogFromDockerJson(String jsonInput) { Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(jsonInput); return jsonMap.get("log").toString(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 492e531..d72ead4 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -49,6 +49,34 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +/** + * Parses lines, and split them to different fields based on grok expressions. Can be used with docker inputs as well. (with {@link DockerLogFilter}) + * Example configuration (see message_pattern and multiline_pattern): + * <pre> + * "filter": [ + * { + * "filter": "grok", + * "conditions": { + * "fields": { + * "type": [ + * "logsearch_server" + * ] + * } + * }, + * "log4j_format": "", + * "multiline_pattern": "^(%{DATESTAMP:logtime})", + * "message_pattern": "(?m)^%{DATESTAMP:logtime}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{JAVACLASS}%{SPACE}\\(%{JAVAFILE:file}:%{INT:line_number}\\)%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}", + * "post_map_values": { + * "logtime": { + * "map_date": { + * "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS" + * } + * } + * } + * } + * ] + * </pre> + */ public class FilterGrok extends Filter<LogFeederProps> { private static final Logger logger = LogManager.getLogger(FilterGrok.class); @@ -91,7 +119,7 @@ public class FilterGrok extends Filter<LogFeederProps> { skipOnError = ((FilterGrokDescriptor) getFilterDescriptor()).isSkipOnError(); if (logFeederProps.isDockerContainerRegistryEnabled()) { Input input = getInput(); - if (input != null && input instanceof InputFile) { + if (input instanceof InputFile) { dockerEnabled = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) input.getInputDescriptor()).getDockerEnabled(), false); } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java index b7cf55d..2cf412d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java @@ -30,6 +30,25 @@ import org.apache.logging.log4j.Logger; import java.util.Map; +/** + * Filter lines in JSON format, if the logs are produced with a Log Search JSON layout appender, Log Feeder won't need + * to parse and split lines, that would mean better performance on Log Feeder side. + * Example configuration: + * <pre> + * "filter": [ + * { + * "filter": "json", + * "conditions": { + * "fields": { + * "type": [ + * "logsearch_app" + * ] + * } + * } + * } + * ] + * </pre> + */ public class FilterJSON extends Filter<LogFeederProps> { private static final Logger logger = LogManager.getLogger(FilterJSON.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java index 64f3763..b4153c7 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -35,6 +35,34 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +/** + * Filter for parsing lines as key value pairs (it is required to provide delimiters for splitting values/fields and borders as well) + * Example configuration: (input: "User(admin), RemoteIp(10.0.0.1)") + * <pre> + * "filter": [ + * "filter": "keyvalue", + * "sort_order": 1, + * "conditions": { + * "fields": { + * "type": [ + * "ambari_audit" + * ] + * } + * }, + * "source_field": "log_message", + * "field_split": ", ", + * "value_borders": "()", + * "post_map_values": { + * "User": { + * "map_field_value": { + * "pre_value": "null", + * "post_value": "unknown" + * } + * } + * } + * ] + * </pre> + */ public class FilterKeyValue extends Filter<LogFeederProps> { private static final Logger logger = LogManager.getLogger(FilterKeyValue.class); @@ -59,7 +87,6 @@ public class FilterKeyValue extends Filter<LogFeederProps> { fieldSplit + ", " + getShortDescription()); if (StringUtils.isEmpty(sourceField)) { logger.fatal("source_field is not set for filter. Thiss filter will not be applied"); - return; } } @@ -84,8 +111,8 @@ public class FilterKeyValue extends Filter<LogFeederProps> { String[] tokens = keyValueString.split(splitPattern); for (String nv : tokens) { String[] nameValue = getNameValue(nv); - String name = nameValue != null && nameValue.length == 2 ? nameValue[0] : null; - String value = nameValue != null && nameValue.length == 2 ? nameValue[1] : null; + String name = nameValue.length == 2 ? nameValue[0] : null; + String value = nameValue.length == 2 ? nameValue[1] : null; if (name != null && value != null) { if (valueMap.containsKey(value)) { value = valueMap.get(value); @@ -123,7 +150,9 @@ public class FilterKeyValue extends Filter<LogFeederProps> { String value = keyValueString.substring(lastPos, pos).trim(); String valueId = "$VALUE" + (++valueNum); valueMap.put(valueId, value); - processed.append(valueSplit + valueId); + processed + .append(valueSplit) + .append(valueId); lastPos = pos + 1; } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index 12198ee..57f5b3d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -36,6 +36,9 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +/** + * Upload configs with config handler (if those do not exist in the config store) and cache them in order to not check them again + */ public class InputConfigUploader extends Thread { private static final Logger logger = LogManager.getLogger(InputConfigUploader.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index 022dc01..b8eb5e9 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -32,8 +32,8 @@ import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor; -import org.apache.commons.lang.BooleanUtils; -import org.apache.commons.lang.ObjectUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -42,8 +42,16 @@ import org.apache.solr.common.util.Base64; import java.io.BufferedReader; import java.io.File; -import java.util.*; - +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Input file object holds input shipper configurations, and can be used to start threads to monitor specific input file. + * If used with wildcards path or in docker mode, it can start multiple threads. (docker: using multiple files based on docker labels, + * as it is possible to use the same label types on different containers on 1 host, wildcard: if a pattern can be matched on multiple files/folder, + * it will be needed to start multiple input threads) + */ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileBaseDescriptor> { private static final Logger logger = LogManager.getLogger(InputFile.class); @@ -178,7 +186,7 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB throw new RuntimeException(e); } } - dockerLogFileUpdateMonitorThread = new Thread(new DockerLogFileUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "docker_logfiles_updater=" + logType); + dockerLogFileUpdateMonitorThread = new Thread(new DockerLogFileUpdateMonitor(this, pathUpdateIntervalMin, detachTimeMin), "docker_logfiles_updater=" + logType); dockerLogFileUpdateMonitorThread.setDaemon(true); dockerLogFileUpdateMonitorThread.start(); } @@ -323,7 +331,7 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB } private void copyFiles(File[] files) { - boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getCopyFile(), false); + boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().getCopyFile(), false); if (isCopyFile && files != null) { for (File file : files) { try { @@ -340,6 +348,11 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB } } + /** + * Start docker input file thread - by copying the input object and its filters (and set the log file to a specific json path) + * @param dockerMetadata holds docker metadata that was gathered by docker commands + * @throws CloneNotSupportedException error if input object could not be cloned + */ public void startNewChildDockerInputFileThread(DockerMetadata dockerMetadata) throws CloneNotSupportedException { logger.info("Start docker child input thread - " + dockerMetadata.getLogPath()); InputFile clonedObject = (InputFile) this.clone(); @@ -356,6 +369,10 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB thread.start(); } + /** + * Stop docker input file thread + * @param logPathKey file key for docker log (json) + */ public void stopChildDockerInputFileThread(String logPathKey) { logger.info("Stop child input thread - " + logPathKey); String filePath = new File(logPathKey).getName(); @@ -371,6 +388,11 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB } } + /** + * Start a new child input - if more files can be defined by an input (using wildcards) - clone this input object and start threads one-by-one + * @param folderFileEntry folder that holds the file that needs to be monitored. + * @throws CloneNotSupportedException error if input object could not be cloned + */ public void startNewChildInputFileThread(Map.Entry<String, List<File>> folderFileEntry) throws CloneNotSupportedException { logger.info("Start child input thread - " + folderFileEntry.getKey()); InputFile clonedObject = (InputFile) this.clone(); @@ -422,6 +444,10 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB } } + /** + * Stop file input thread that was monitored by this class + * @param folderPathKey folder that contains input file that is monitored + */ public void stopChildInputFileThread(String folderPathKey) { logger.info("Stop child input thread - " + folderPathKey); String filePath = new File(getFilePath()).getName(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java index 91ffd5e..70e54d6 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java @@ -49,8 +49,6 @@ public class InputManagerImpl extends InputManager { private MetricData filesCountMetric = new MetricData("input.files.count", true); - private Thread inputIsReadyMonitor; - @Inject private DockerContainerRegistry dockerContainerRegistry; @@ -66,11 +64,7 @@ public class InputManagerImpl extends InputManager { @Override public void add(String serviceName, Input input) { - List<Input> inputList = inputs.get(serviceName); - if (inputList == null) { - inputList = new ArrayList<>(); - inputs.put(serviceName, inputList); - } + List<Input> inputList = inputs.computeIfAbsent(serviceName, k -> new ArrayList<>()); inputList.add(input); } @@ -82,7 +76,10 @@ public class InputManagerImpl extends InputManager { } for (Input input : inputList) { while (!input.isClosed()) { - try { Thread.sleep(100); } catch (InterruptedException e) {} + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } } } inputList.clear(); @@ -116,6 +113,7 @@ public class InputManagerImpl extends InputManager { return count; } + @SuppressWarnings("unchecked") @Override public void init() throws Exception { checkpointHandler.init(logFeederProps); @@ -132,7 +130,7 @@ public class InputManagerImpl extends InputManager { } private void startMonitorThread() { - inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { + Thread inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { @Override public void run() { logger.info("Going to monitor for these missing files: " + notReadyList.toString()); @@ -213,33 +211,6 @@ public class InputManagerImpl extends InputManager { // TODO: logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", ""); } - public void waitOnAllInputs() { - //wait on inputs - for (List<Input> inputList : inputs.values()) { - for (Input input : inputList) { - if (input != null) { - Thread inputThread = input.getThread(); - if (inputThread != null) { - try { - inputThread.join(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - } - // wait on monitor - if (inputIsReadyMonitor != null) { - try { - this.close(); - inputIsReadyMonitor.join(); - } catch (InterruptedException e) { - // ignore - } - } - } - public void checkInAll() { for (List<Input> inputList : inputs.values()) { for (Input input : inputList) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index c4d5fb9..5191045 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -29,13 +29,18 @@ import org.apache.solr.common.util.Base64; import java.io.BufferedReader; import java.io.File; +/** + * Download file from S3 then start processing it. + */ public class InputS3File extends InputFile { private static final Logger logger = LogManager.getLogger(InputS3File.class); + private boolean ready = false; + @Override public boolean isReady() { - if (!isReady()) { + if (!ready) { // Let's try to check whether the file is available setLogFiles(getActualFiles(getLogPath())); if (!ArrayUtils.isEmpty(getLogFiles())) { @@ -49,7 +54,12 @@ public class InputS3File extends InputFile { logger.debug(getLogPath() + " file doesn't exist. Ignoring for now"); } } - return isReady(); + return ready; + } + + @Override + public void setReady(boolean ready) { + this.ready = ready; } private File[] getActualFiles(String searchPath) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index 5609f61..0e534a4 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -45,6 +45,9 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +/** + * Input type for simulating inputs for Log Feeder + */ public class InputSimulate extends InputFile { private static final Logger logger = LogManager.getLogger(InputSimulate.class); private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}"; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java index 554923a..965aa84 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java @@ -37,6 +37,9 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +/** + * Open a socket input (with specific port) to handle incoming messages (as serialized log objects or simple messages) + */ public class InputSocket extends Input<LogFeederProps, InputSocketMarker, InputSocketDescriptor> { private static final Logger logger = LogManager.getLogger(InputSocket.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java index e3d34cd..b9347f1 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java @@ -28,6 +28,9 @@ import org.apache.logging.log4j.Logger; import java.io.BufferedReader; import java.io.File; +/** + * Helper for input file processing (open files, read line and pass them to filters and output(s)) + */ public class ProcessFileHelper { private static final Logger logger = LogManager.getLogger(ProcessFileHelper.class); @@ -35,6 +38,13 @@ public class ProcessFileHelper { private ProcessFileHelper() { } + /** + * Process input (apply on filters then send to an output) log file(s) until EOF - stop processing onlu + * @param inputFile input file descriptor + * @param logPathFile input file object + * @param follow if is is set the processing won't stop at EOF + * @throws Exception error during file processing + */ public static void processFile(InputFile inputFile, File logPathFile, boolean follow) throws Exception { logger.info("Monitoring logPath=" + inputFile.getLogPath() + ", logPathFile=" + logPathFile); BufferedReader br = null; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java index bdd775a..9d096cb 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/FileCheckpointManager.java @@ -37,6 +37,9 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Stream; +/** + * Handle checkpoints (as JSON files) for inputs. + */ public class FileCheckpointManager implements CheckpointManager<InputFile, InputFileMarker, LogFeederProps> { private static final Logger logger = LogManager.getLogger(FileCheckpointManager.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java index dd35d07..da09e3d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/CheckpointFileReader.java @@ -30,17 +30,32 @@ import java.io.RandomAccessFile; import java.lang.reflect.Type; import java.util.Map; +/** + * Utility class for reading checkpoint JSON files + */ public class CheckpointFileReader { private CheckpointFileReader() { } + /** + * Get all checkpoint files + * @param checkPointFolderFile folder where + * @param checkPointExtension checkpoint file extension, e.g.: .cp + * @return array of checkpoint files + */ public static File[] getFiles(File checkPointFolderFile, String checkPointExtension) { String searchPath = "*" + checkPointExtension; FileFilter fileFilter = new WildcardFileFilter(searchPath); return checkPointFolderFile.listFiles(fileFilter); } + /** + * Obtain checkpoint file object from a file + * @param checkPointFile checkpoint file object + * @return checkpoint file object details as key / value pairs + * @throws IOException error during gather checkpoint file object from a file + */ public static Map<String, String> getCheckpointObject(File checkPointFile) throws IOException { final Map<String, String> jsonCheckPoint; try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java index 2b52661..354f933 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckInHelper.java @@ -31,6 +31,9 @@ import java.io.RandomAccessFile; import java.util.Date; import java.util.Map; +/** + * Utility class to save input details in a checkpoint file + */ public class FileCheckInHelper { private static final Logger logger = LogManager.getLogger(FileCheckInHelper.class); @@ -38,6 +41,11 @@ public class FileCheckInHelper { private FileCheckInHelper() { } + /** + * Save input details to a checkpoint files + * @param inputFile input file object that is processed + * @param inputMarker input file details holder object + */ public static void checkIn(InputFile inputFile, InputFileMarker inputMarker) { try { Map<String, Object> jsonCheckPoint = inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey()); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java index d38d14d..da54a21 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/FileCheckpointCleanupHelper.java @@ -29,6 +29,9 @@ import java.io.File; import java.io.RandomAccessFile; import java.util.Map; +/** + * Utility class to cleanup checkpoint files. + */ public class FileCheckpointCleanupHelper { private static final Logger logger = LogManager.getLogger(FileCheckpointCleanupHelper.class); @@ -36,6 +39,11 @@ public class FileCheckpointCleanupHelper { private FileCheckpointCleanupHelper() { } + /** + * Cleanup checkpoint files in the checkpoint folder if required. + * @param checkPointFolderFile checkpoint file folder that contains the checkpoint files + * @param checkPointExtension checkpoint file extension. e.g: .cp + */ public static void cleanCheckPointFiles(File checkPointFolderFile, String checkPointExtension) { if (checkPointFolderFile == null) { logger.info("Will not clean checkPoint files. checkPointFolderFile=null"); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java index 66c686c..489e82c 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/checkpoint/util/ResumeLineNumberHelper.java @@ -29,6 +29,9 @@ import java.io.RandomAccessFile; import java.util.HashMap; import java.util.Map; +/** + * Utility class to get last processed line number from a checkpoint file. + */ public class ResumeLineNumberHelper { private static final Logger logger = LogManager.getLogger(ResumeLineNumberHelper.class); @@ -36,6 +39,12 @@ public class ResumeLineNumberHelper { private ResumeLineNumberHelper() { } + /** + * Get last processed line number from a checkpoint file for an input + * @param inputFile input file object + * @param checkPointFolder checkpoint folder that contains + * @return last processed line number of an input file + */ public static int getResumeFromLineNumber(InputFile inputFile, File checkPointFolder) { int resumeFromLineNumber = 0; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java index a41a257..edf90e9 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java @@ -22,6 +22,9 @@ import org.apache.ambari.logfeeder.input.InputFile; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * Holds common file monitoring operations: start a thread that periodically do something which can be implemented by subclasses. + */ public abstract class AbstractLogFileMonitor implements Runnable { private static final Logger LOG = LogManager.getLogger(AbstractLogFileMonitor.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java index 28bf401..91e3c8f 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java @@ -22,6 +22,9 @@ import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * Periodically execute a cleanup on checkpoints. (E.g.: it can delete a checkpoint file if it is too old) + */ public class CheckpointCleanupMonitor implements Runnable { private static final Logger logger = LogManager.getLogger(CheckpointCleanupMonitor.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java index 859e6e0..63e4f2e 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java @@ -31,14 +31,15 @@ import java.util.Map; /** * Periodically check docker containers metadata registry, stop monitoring container log files if those do not exist or stopped too long time ago. * If it finds a new container log for the specific type, it will start to monitoring it. - * <br/> - * Use cases:<br/> - * - input has not monitored yet - found new container -> start monitoring it <br/> - * - input has not monitored yet - found new stopped container -> start monitoring it <br/> - * - input has not monitored yet - found new stopped container but log is too old -> do not monitoring it <br/> - * - input has monitored already - container stopped - if it's stopped for too long time -> remove it from the monitoed list<br/> - * - input has monitored already - container stopped - log is not too old -> keep in the monitored list <br/> - * - input has monitored already - container does not exist - remove it from the monitoed list (and all other input with the same log type) <br/> + * <pre> + * Use cases: + * - input has not monitored yet - found new container: start monitoring it + * - input has not monitored yet - found new stopped container: start monitoring it + * - input has not monitored yet - found new stopped container but log is too old: do not monitoring it + * - input has monitored already - container stopped - if it's stopped for too long time : remove it from the monitored list + * - input has monitored already - container stopped - log is not too old: keep in the monitored list + * - input has monitored already - container does not exist - remove it from the monitored list (and all other input with the same log type) + * </pre> */ public class DockerLogFileUpdateMonitor extends AbstractLogFileMonitor { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java index 4f5b516..ca6697e 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java @@ -28,6 +28,9 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.util.zip.GZIPInputStream; +/** + * Reader class that can read gzip input streams. + */ class GZIPReader extends InputStreamReader { private static final Logger logger = LogManager.getLogger(GZIPReader.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java index c5453df..77097ef 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java @@ -26,10 +26,19 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.Reader; +/** + * Factory for reading file inputs + */ public enum LogsearchReaderFactory { INSTANCE; private static final Logger logger = LogManager.getLogger(LogsearchReaderFactory.class); + /** + * Get a reader for a file based (for simple text file or for gzipped file) + * @param file input file to read + * @return file reader object + * @throws FileNotFoundException error that happens if file is not found + */ public Reader getReader(File file) throws FileNotFoundException { logger.debug("Inside reader factory for file:" + file); if (GZIPReader.isValidFile(file.getAbsolutePath())) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java index 3b9f421..a1780cf 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java @@ -48,6 +48,9 @@ import java.util.TimeZone; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +/** + * Manage log level filter object and cache them. (in memory) + */ public class LogLevelFilterHandler implements LogLevelFilterMonitor { private static final Logger logger = LogManager.getLogger(LogLevelFilterHandler.class); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java index 652917f..281c2ee 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java @@ -33,6 +33,22 @@ import org.apache.logging.log4j.Logger; import java.util.Map; +/** + * Field mapper to anonymize fields if it has match with on a specific pattern. + * Example: + * <pre> + * "post_map_values": { + * "message_field_with_password": [ + * { + * "map_anonymize": { + * "pattern": "password: *." + * "hide_char: "*" + * } + * } + * ] + * } + * </pre> + */ public class MapperAnonymize extends Mapper<LogFeederProps> { private static final Logger logger = LogManager.getLogger(MapperAnonymize.class); @@ -43,7 +59,7 @@ public class MapperAnonymize extends Mapper<LogFeederProps> { private char hideChar; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { + public boolean init(LogFeederProps logFeederProps, String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); pattern = ((MapAnonymizeDescriptor)mapFieldDescriptor).getPattern(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java index 14ecc33..663ff19 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java @@ -37,6 +37,19 @@ import java.util.Calendar; import java.util.Date; import java.util.Map; +/** + * Map date string from string format to date with providing date patterns. (source and target patterns can be provided) + * <pre> + * "post_map_values": { + * "logtime": { + * "map_date": { + * "target_date_pattern": "yyyy-MM-dd HH:mm:ss,SSS", + * "src_date_pattern" :"MMM dd HH:mm:ss" + * } + * } + * } + * </pre> + */ public class MapperDate extends Mapper<LogFeederProps> { private static final Logger logger = LogManager.getLogger(MapperDate.class); @@ -45,7 +58,7 @@ public class MapperDate extends Mapper<LogFeederProps> { private FastDateFormat srcDateFormatter=null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { + public boolean init(LogFeederProps logFeederProps, String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); String targetDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getTargetDatePattern(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java index ca164d5..3337b50 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java @@ -30,7 +30,17 @@ import org.apache.logging.log4j.Logger; import java.util.Map; /** - * Overrides the value for the field + * Copy field to an another field + * <pre> + * "post_map_values": { + * "Status": [ + * { + * "map_field_copy": { + * "copy_name": "ws_status_copied" + * } + * } + * } + * </pre> */ public class MapperFieldCopy extends Mapper<LogFeederProps> { private static final Logger logger = LogManager.getLogger(MapperFieldCopy.class); @@ -38,7 +48,7 @@ public class MapperFieldCopy extends Mapper<LogFeederProps> { private String copyName = null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { + public boolean init(LogFeederProps logFeederProps, String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); copyName = ((MapFieldCopyDescriptor)mapFieldDescriptor).getCopyName(); if (StringUtils.isEmpty(copyName)) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java index dce4e7c..9a2fc4d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java @@ -32,7 +32,17 @@ import org.apache.logging.log4j.Logger; import java.util.Map; /** - * Overrides the value for the field + * Overrides the name of a field + * <pre> + * "post_map_values": { + * "Status": [ + * { + * "map_field_name": { + * "new_field_name": "ws_status" + * } + * } + * } + * </pre> */ public class MapperFieldName extends Mapper<LogFeederProps> { private static final Logger logger = LogManager.getLogger(MapperFieldName.class); @@ -40,7 +50,7 @@ public class MapperFieldName extends Mapper<LogFeederProps> { private String newValue = null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { + public boolean init(LogFeederProps logFeederProps, String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); newValue = ((MapFieldNameDescriptor)mapFieldDescriptor).getNewFieldName(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java index 3c2fc06..3dcffc7 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java @@ -33,6 +33,17 @@ import java.util.Map; /** * Overrides the value for the field + * <pre> + * "post_map_values": { + * "Result": [ + * { + * 'map_field_value': { + * 'pre_value': 'true', + * 'post_value': '1' + * } + * } + * } + * </pre> */ public class MapperFieldValue extends Mapper<LogFeederProps> { private static final Logger logger = LogManager.getLogger(MapperFieldValue.class); @@ -41,7 +52,7 @@ public class MapperFieldValue extends Mapper<LogFeederProps> { private String newValue = null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { + public boolean init(LogFeederProps logFeederProps, String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); prevValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPreValue(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java index 910d4d6..7dae1b8 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java @@ -90,7 +90,7 @@ public class OutputFile extends Output<LogFeederProps, InputFileMarker> { // Ignore this exception } } - setClosed(true); + shouldCloseOutput(); } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index 13bb772..ed93aa4 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -103,7 +103,7 @@ public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> impl logger.info("Closing file." + getShortDescription()); logSpooler.rollover(); this.stopHDFSCopyThread(); - setClosed(true); + shouldCloseOutput(); } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java index f10cb9b..49765f5 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java @@ -36,6 +36,9 @@ public class OutputLineFilter { /** * Applies filter based on input cache (on service log only). * Get the message and in-memory timestamp for log line. If both are not empty, evaluate that log line needs to be filtered out or not. + * @param lineMap holds output fields and values (as key/value pairs) + * @param input holds input object + * @return log filtered out or not */ public Boolean apply(Map<String, Object> lineMap, Input input) { boolean isLogFilteredOut = false; @@ -56,9 +59,7 @@ public class OutputLineFilter { } } } - if (lineMap.containsKey(LogFeederConstants.IN_MEMORY_TIMESTAMP)) { - lineMap.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP); - } + lineMap.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP); return isLogFilteredOut; } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java index 595a738..68db96a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java @@ -30,7 +30,6 @@ import org.apache.ambari.logfeeder.plugin.input.InputMarker; import org.apache.ambari.logfeeder.plugin.manager.OutputManager; import org.apache.ambari.logfeeder.plugin.output.Output; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Level; @@ -51,8 +50,6 @@ public class OutputManagerImpl extends OutputManager { private List<Output> outputs = new ArrayList<>(); - private boolean addMessageMD5 = true; - private static long docCounter = 0; private MetricData messageTruncateMetric = new MetricData(null, false); @@ -68,20 +65,11 @@ public class OutputManagerImpl extends OutputManager { return outputs; } - public List<? extends OutputConfigMonitor> getOutputsToMonitor() { - List<Output> outputsToMonitor = new ArrayList<>(); - for (Output output : outputs) { - if (output.monitorConfigChanges()) { - outputsToMonitor.add(output); - } - } - return outputsToMonitor; - } - public void add(Output output) { this.outputs.add(output); } + @SuppressWarnings("unchecked") @Override public void init() throws Exception { for (Output output : outputs) { @@ -89,6 +77,7 @@ public class OutputManagerImpl extends OutputManager { } } + @SuppressWarnings("unchecked") public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { Input input = inputMarker.getInput(); @@ -117,19 +106,17 @@ public class OutputManagerImpl extends OutputManager { byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes(); - Long eventMD5 = Hashing.md5().hashBytes(bytes).asLong(); + long eventMD5 = Hashing.md5().hashBytes(bytes).asLong(); if (input.isGenEventMD5()) { - jsonObj.put("event_md5", prefix + eventMD5.toString()); + jsonObj.put("event_md5", prefix + Long.toString(eventMD5)); } if (input.isUseEventMD5()) { - jsonObj.put("id", prefix + eventMD5.toString()); + jsonObj.put("id", prefix + Long.toString(eventMD5)); } } - jsonObj.put("seq_num", new Long(docCounter++)); - if (jsonObj.get("event_count") == null) { - jsonObj.put("event_count", new Integer(1)); - } + jsonObj.put("seq_num", docCounter++); + jsonObj.computeIfAbsent("event_count", k -> 1); if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) { jsonObj.put("group", input.getInputDescriptor().getGroup()); } @@ -141,9 +128,7 @@ public class OutputManagerImpl extends OutputManager { // TODO: Let's check size only for log_message for now String logMessage = (String) jsonObj.get("log_message"); logMessage = truncateLongLogMessage(jsonObj, input, logMessage); - if (addMessageMD5) { - jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong()); - } + jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong()); } List<String> defaultLogLevels = getDefaultLogLevels(input); if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels) @@ -192,6 +177,7 @@ public class OutputManagerImpl extends OutputManager { return logMessage; } + @SuppressWarnings("unchecked") public void write(String jsonBlock, InputMarker inputMarker) { List<String> defaultLogLevels = getDefaultLogLevels(inputMarker.getInput()); if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker, defaultLogLevels)) { @@ -206,6 +192,7 @@ public class OutputManagerImpl extends OutputManager { } } + @SuppressWarnings("unchecked") public void copyFile(File inputFile, InputMarker inputMarker) { Input input = inputMarker.getInput(); List<? extends Output> outputList = input.getOutputList(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index 38a2937..7d7e6af 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -189,10 +189,9 @@ public class OutputS3File extends OutputFile implements RolloverCondition, Rollo * @param block The log event to upload * @param inputMarker Contains information about the log file feeding the lines. - * @throws Exception */ @Override - public void write(String block, InputFileMarker inputMarker) throws Exception { + public void write(String block, InputFileMarker inputMarker) { if (logSpooler == null) { if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) { InputFile input = (InputFile) inputMarker.getInput(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index 5d8e59d..dd89e0a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -56,6 +56,34 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +/** + * Ship (transformed) input data to solr destination. Works with both solr cloud mode or providing static solr url(s). + * In Solr cloud mode Log Feeder will manage and listen a ZooKeeper connection. If there are too many Log Feeder nodes that can mean + * it requires a lot of client connections. (for static urls, use "solr_urls" field, for Solr cloud mode use "zk_connect_string") + * Example configuration (using JSON config api): + * <pre> + * { + * "output": [ + * { + * "is_enabled": "true", + * "comment": "Output to solr for service logs", + * "collection" : "hadoop_logs", + * "destination": "solr", + * "zk_connect_string": "localhost:9983", + * "type": "service", + * "skip_logtime": "true", + * "conditions": { + * "fields": { + * "rowtype": [ + * "service" + * ] + * } + * } + * } + * ] + * } + * </pre> + */ public class OutputSolr extends Output<LogFeederProps, InputMarker> { private static final Logger logger = LogManager.getLogger(OutputSolr.class); @@ -74,7 +102,6 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { private String type; private String collection; - private String splitMode; private int splitInterval; private String zkConnectString; private String[] solrUrls = null; @@ -92,11 +119,6 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { private LogFeederProps logFeederProps; @Override - public boolean monitorConfigChanges() { - return true; - }; - - @Override public String getOutputType() { return type; } @@ -142,7 +164,7 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { workers = getIntValue("workers", DEFAULT_NUMBER_OF_WORKERS); splitInterval = 0; - splitMode = getStringValue("split_interval", "none"); + String splitMode = getStringValue("split_interval", "none"); if (!splitMode.equals("none")) { splitInterval = Integer.parseInt(splitMode); } @@ -498,7 +520,7 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { } @Override - public void write(String block, InputMarker inputMarker) throws Exception { + public void write(String block, InputMarker inputMarker) { } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java index 07b5f4f..1135eea 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java @@ -76,6 +76,12 @@ public class S3Util { /** * Get the buffer reader to read s3 file as a stream + * @param s3Path s3 specific path + * @param s3Endpoint url of an s3 server + * @param accessKey s3 access key - pass by an input shipper configuration + * @param secretKey s3 secret key - pass by an input shipper configuration + * @return buffered reader object which can be used to read s3 file object + * @throws Exception error that happens during reading s3 file */ public static BufferedReader getReader(String s3Path, String s3Endpoint, String accessKey, String secretKey) throws Exception { // TODO error handling diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java index 7953615..32fb16b 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java @@ -37,7 +37,7 @@ public class MapperAnonymizeTest { mapAnonymizeDescriptorImpl.setPattern("secret <hide> / <hide> is here"); MapperAnonymize mapperAnonymize = new MapperAnonymize(); - assertTrue("Could not initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl)); + assertTrue("Could not initialize!", mapperAnonymize.init(null, null, "someField", null, mapAnonymizeDescriptorImpl)); Map<String, Object> jsonObj = new HashMap<>(); mapperAnonymize.apply(jsonObj, "something else secret SECRET1 / SECRET2 is here something else 2"); @@ -54,7 +54,7 @@ public class MapperAnonymizeTest { mapAnonymizeDescriptorImpl.setHideChar('X'); MapperAnonymize mapperAnonymize = new MapperAnonymize(); - assertTrue("Could not initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl)); + assertTrue("Could not initialize!", mapperAnonymize.init(null,null, "someField", null, mapAnonymizeDescriptorImpl)); Map<String, Object> jsonObj = new HashMap<>(); mapperAnonymize.apply(jsonObj, "something else SECRET1 / SECRET2 is the secret something else 2"); @@ -69,6 +69,6 @@ public class MapperAnonymizeTest { MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl(); MapperAnonymize mapperAnonymize = new MapperAnonymize(); - assertFalse("Was not able to initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl)); + assertFalse("Was not able to initialize!", mapperAnonymize.init(null, null, "someField", null, mapAnonymizeDescriptorImpl)); } } diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java index d0643ea..59f43d3 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java @@ -42,7 +42,7 @@ public class MapperDateTest { mapDateDescriptor.setTargetDatePattern("epoch"); MapperDate mapperDate = new MapperDate(); - assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor)); + assertTrue("Could not initialize!", mapperDate.init(null, null, "someField", null, mapDateDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); @@ -62,7 +62,7 @@ public class MapperDateTest { mapDateDescriptor.setTargetDatePattern("yyyy-MM-dd HH:mm:ss.SSS"); MapperDate mapperDate = new MapperDate(); - assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor)); + assertTrue("Could not initialize!", mapperDate.init(null, null, "someField", null, mapDateDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); String dateString = "2016-04-08 15:55:23.548"; @@ -82,7 +82,7 @@ public class MapperDateTest { MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl(); MapperDate mapperDate = new MapperDate(); - assertFalse("Was not able to initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor)); + assertFalse("Was not able to initialize!", mapperDate.init(null, null, "someField", null, mapDateDescriptor)); } @Test @@ -92,7 +92,7 @@ public class MapperDateTest { mapDateDescriptor.setTargetDatePattern("not_parsable_content"); MapperDate mapperDate = new MapperDate(); - assertFalse("Was not able to initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor)); + assertFalse("Was not able to initialize!", mapperDate.init(null, null, "someField", null, mapDateDescriptor)); } @Test @@ -102,7 +102,7 @@ public class MapperDateTest { mapDateDescriptor.setTargetDatePattern("epoch"); MapperDate mapperDate = new MapperDate(); - assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor)); + assertTrue("Could not initialize!", mapperDate.init(null, null, "someField", null, mapDateDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); String invalidValue = "abc"; @@ -119,7 +119,7 @@ public class MapperDateTest { mapDateDescriptor.setTargetDatePattern("yyyy-MM-dd HH:mm:ss.SSS"); MapperDate mapperDate = new MapperDate(); - assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor)); + assertTrue("Could not initialize!", mapperDate.init(null, null, "someField", null, mapDateDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); String invalidValue = "abc"; diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java index 32c8b99..88070f3 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java @@ -37,7 +37,7 @@ public class MapperFieldCopyTest { mapFieldCopyDescriptor.setCopyName("someOtherField"); MapperFieldCopy mapperFieldCopy = new MapperFieldCopy(); - assertTrue("Could not initialize!", mapperFieldCopy.init(null, "someField", null, mapFieldCopyDescriptor)); + assertTrue("Could not initialize!", mapperFieldCopy.init(null, null, "someField", null, mapFieldCopyDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); jsonObj.put("someField", "someValue"); @@ -55,6 +55,6 @@ public class MapperFieldCopyTest { MapFieldCopyDescriptorImpl mapFieldCopyDescriptor = new MapFieldCopyDescriptorImpl(); MapperFieldCopy mapperFieldCopy = new MapperFieldCopy(); - assertFalse("Was not able to initialize!", mapperFieldCopy.init(null, "someField", null, mapFieldCopyDescriptor)); + assertFalse("Was not able to initialize!", mapperFieldCopy.init(null, null, "someField", null, mapFieldCopyDescriptor)); } } diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java index b1dbd4d..a2ec6ee 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java @@ -37,7 +37,7 @@ public class MapperFieldNameTest { mapFieldNameDescriptor.setNewFieldName("someOtherField"); MapperFieldName mapperFieldName = new MapperFieldName(); - assertTrue("Could not initialize!", mapperFieldName.init(null, "someField", null, mapFieldNameDescriptor)); + assertTrue("Could not initialize!", mapperFieldName.init(null, null, "someField", null, mapFieldNameDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); jsonObj.put("someField", "someValue"); @@ -55,6 +55,6 @@ public class MapperFieldNameTest { MapFieldNameDescriptorImpl mapFieldNameDescriptor = new MapFieldNameDescriptorImpl(); MapperFieldName mapperFieldName = new MapperFieldName(); - assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, mapFieldNameDescriptor)); + assertFalse("Was able to initialize!", mapperFieldName.init(null, null, "someField", null, mapFieldNameDescriptor)); } } diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java index 1c82a1e..42302ca 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java @@ -38,7 +38,7 @@ public class MapperFieldValueTest { mapFieldValueDescriptor.setPostValue("someOtherValue"); MapperFieldValue mapperFieldValue = new MapperFieldValue(); - assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapFieldValueDescriptor)); + assertTrue("Could not initialize!", mapperFieldValue.init(null,null, "someField", null, mapFieldValueDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); @@ -55,7 +55,7 @@ public class MapperFieldValueTest { MapFieldValueDescriptorImpl mapFieldValueDescriptor = new MapFieldValueDescriptorImpl(); MapperFieldValue mapperFieldValue = new MapperFieldValue(); - assertFalse("Was not able to initialize!", mapperFieldValue.init(null, "someField", null, mapFieldValueDescriptor)); + assertFalse("Was not able to initialize!", mapperFieldValue.init(null, null, "someField", null, mapFieldValueDescriptor)); } @Test @@ -66,7 +66,7 @@ public class MapperFieldValueTest { mapFieldValueDescriptor.setPostValue("someOtherValue"); MapperFieldValue mapperFieldValue = new MapperFieldValue(); - assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapFieldValueDescriptor)); + assertTrue("Could not initialize!", mapperFieldValue.init(null, null, "someField", null, mapFieldValueDescriptor)); Map<String, Object> jsonObj = new HashMap<>(); diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ExternalServerClient.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ExternalServerClient.java index 4247bc0..31ab03e 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ExternalServerClient.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ExternalServerClient.java @@ -50,17 +50,18 @@ public class ExternalServerClient { /** * Send GET request to an external server + * @param loginUrl external url + * @param classObject response object type + * @param username basic auth credential user + * @param password basic auth credential password + * @return response + * @throws Exception error during send request to external location */ - public Object sendGETRequest(String loginUrl, Class<?> klass, String username, String password) throws Exception { + public Object sendGETRequest(String loginUrl, Class<?> classObject, String username, String password) throws Exception { if (localJerseyClient == null) { - localJerseyClient = new ThreadLocal<JerseyClient>() { - @Override - protected JerseyClient initialValue() { - return sslConfigurer.isKeyStoreSpecified() ? - new JerseyClientBuilder().sslContext(sslConfigurer.getSSLContext()).build() : - JerseyClientBuilder.createClient(); - } - }; + localJerseyClient = ThreadLocal.withInitial(() -> sslConfigurer.isKeyStoreSpecified() ? + new JerseyClientBuilder().sslContext(sslConfigurer.getSSLContext()).build() : + JerseyClientBuilder.createClient()); } String url = authPropsConfig.getExternalAuthHostUrl() + loginUrl; JerseyClient client = localJerseyClient.get(); @@ -80,7 +81,7 @@ public class ExternalServerClient { throw new InvalidCredentialsException(String.format("External auth failed with status code: %d, response: %s", response.getStatus(), response.readEntity(String.class))); } - return response.readEntity(klass); + return response.readEntity(classObject); } catch (Exception e) { throw new Exception(e.getCause()); } finally { diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/LogSearchLdapAuthorityMapper.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/LogSearchLdapAuthorityMapper.java index e5d6d7f..6672d5b 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/LogSearchLdapAuthorityMapper.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/LogSearchLdapAuthorityMapper.java @@ -31,10 +31,9 @@ import java.util.Map; /** * Class to map multiple LDAP groups to Log Search authorities. (definied in a map) * Examples: - * LDAP person -> ROLE_USER - * LDAP user -> ROLE_USER - * LDAP admin -> ROLE_ADMIN - * ROLE_LDAP_ADMIN -> ROLE_ADMIN + * person - ROLE_USER + * user - ROLE_USER + * admin - ROLE_ADMIN */ public class LogSearchLdapAuthorityMapper implements GrantedAuthoritiesMapper { diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/RoleDao.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/RoleDao.java index bb8f589..b0e2642 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/RoleDao.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/RoleDao.java @@ -109,6 +109,7 @@ public class RoleDao { /** * Helper function to create a simple default role details + * @return list of authorities */ public static List<GrantedAuthority> createDefaultAuthorities() { Role r = createRoleWithReadPrivilage("ROLE_USER"); diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/AbstractSolrConfigHandler.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/AbstractSolrConfigHandler.java index dff06e3..2f4b3d8 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/AbstractSolrConfigHandler.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/AbstractSolrConfigHandler.java @@ -67,12 +67,20 @@ public abstract class AbstractSolrConfigHandler implements SolrZkRequestHandler< /** * Update config file (like solrconfig.xml) to zookeeper znode of solr + * @param solrPropsConfig hold global solr configurations + * @param zkClient zk client of the solr client + * @param file that needs to be uploaded to zookeeper + * @param separator file separator + * @param content file content + * @return true if upload was successful (or can be skipped) + * @throws IOException error during file uploading */ public abstract boolean updateConfigIfNeeded(SolrPropsConfig solrPropsConfig, SolrZkClient zkClient, File file, String separator, byte[] content) throws IOException; /** - * Config file name which should be uploaded to zookeeper + * Get config file name + * @return config file name which should be uploaded to zookeeper */ public abstract String getConfigFileName(); diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/JSONUtil.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/JSONUtil.java index 190f4dc..8360d27 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/JSONUtil.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/JSONUtil.java @@ -90,6 +90,9 @@ public class JSONUtil { /** * WRITE JOSN IN FILE ( Delete existing file and create new file) + * @param jsonStr the json string that will be written + * @param outputFile file where the json content will be written + * @param beautify use beautify on json string */ public static synchronized void writeJSONInFile(String jsonStr, File outputFile, boolean beautify) { FileWriter fileWriter = null; @@ -129,13 +132,17 @@ public class JSONUtil { /** * GET VALUES FROM JSON BY GIVING KEY RECURSIVELY + * @param jsonStr the json string that will be read as an JSONObject + * @param keyName a key that will be gathered + * @param values results will be loaded here + * @return response with string type */ @SuppressWarnings("rawtypes") public static String getValuesOfKey(String jsonStr, String keyName, List<String> values) { if (values == null) { return null; } - Object jsonObj = null; + JSONObject jsonObj = null; try { jsonObj = new JSONObject(jsonStr); } catch (Exception e) { @@ -159,7 +166,7 @@ public class JSONUtil { return null; } - Iterator iterator = ((JSONObject) jsonObj).keys(); + Iterator iterator = jsonObj.keys(); if (iterator == null) { return null; } @@ -169,17 +176,17 @@ public class JSONUtil { if (key != null && key.equals(keyName)) { try { - String val = ((JSONObject) jsonObj).getString(key); + String val = jsonObj.getString(key); values.add(val); } catch (Exception e) { // ignore } - } else if ((((JSONObject) jsonObj).optJSONArray(key) != null) || (((JSONObject) jsonObj).optJSONObject(key) != null)) { + } else if ((jsonObj.optJSONArray(key) != null) || (jsonObj.optJSONObject(key) != null)) { String str = null; try { - str = getValuesOfKey("" + ((JSONObject) jsonObj).getString(key), keyName, values); + str = getValuesOfKey("" + jsonObj.getString(key), keyName, values); } catch (Exception e) { // ignore } diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/SolrUtil.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/SolrUtil.java index 95f3cdf..528ec25 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/SolrUtil.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/util/SolrUtil.java @@ -45,6 +45,8 @@ public class SolrUtil { /** * Copied from Solr ClientUtils.escapeQueryChars and removed escaping * + * @param s input string that will be escaped + * @return string result after escaping */ public static String escapeQueryChars(String s) { StringBuilder sb = new StringBuilder(); @@ -204,8 +206,6 @@ public class SolrUtil { } return fieldTypeMap; } - - //============================================================================================================= public static void setFacetField(SolrQuery solrQuery, String facetField) { solrQuery.setFacet(true); diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchKrbFilter.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchKrbFilter.java index 3443c6b..a33f410 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchKrbFilter.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchKrbFilter.java @@ -522,10 +522,6 @@ public class LogsearchKrbFilter implements Filter { } } - /** - * Delegates call to the servlet filter chain. Sub-classes my override this - * method to perform pre and post tasks. - */ protected void doFilter(FilterChain filterChain, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { filterChain.doFilter(request, response); diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchTrustedProxyFilter.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchTrustedProxyFilter.java index 33cfee3..e2f48fb 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchTrustedProxyFilter.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogsearchTrustedProxyFilter.java @@ -48,12 +48,14 @@ import java.util.List; /** * Filter servlet to handle trusted proxy authentication. - * It is disabled by default (see: {@link AuthPropsConfig#isTrustedProxy()}) <br/> - * There are 4 main configuration properties of this filter (allow authentication only if these are matches with the request details): <br/> - * - {@link AuthPropsConfig#getProxyUsers()} - Proxy users <br/> - * - {@link AuthPropsConfig#getProxyUserGroups()} - Proxy groups <br/> - * - {@link AuthPropsConfig#getProxyUserHosts()} - Proxy hosts <br/> - * - {@link AuthPropsConfig#getProxyIp()} - Proxy server IPs<br/> + * It is disabled by default (see: {@link AuthPropsConfig#isTrustedProxy()}) + * There are 4 main configuration properties of this filter (allow authentication only if these are matches with the request details): + * <pre> + * - {@link AuthPropsConfig#getProxyUsers()} - Proxy users + * - {@link AuthPropsConfig#getProxyUserGroups()} - Proxy groups + * - {@link AuthPropsConfig#getProxyUserHosts()} - Proxy hosts + * - {@link AuthPropsConfig#getProxyIp()} - Proxy server IPs + * </pre> */ public class LogsearchTrustedProxyFilter extends AbstractAuthenticationProcessingFilter { diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/security/LogsearchAbstractAuthenticationProvider.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/security/LogsearchAbstractAuthenticationProvider.java index 43854f1..3b1ed5d 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/security/LogsearchAbstractAuthenticationProvider.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/security/LogsearchAbstractAuthenticationProvider.java @@ -39,6 +39,7 @@ abstract class LogsearchAbstractAuthenticationProvider implements Authentication /** * GET Default GrantedAuthority + * @return list of authorities */ protected List<GrantedAuthority> getAuthorities() { List<GrantedAuthority> grantedAuths = new ArrayList<>();