This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch cloudbreak in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
commit 9f6b8053242bbd6c2c978b3ecfda149233a604e1 Author: Olivér Szabó <oleew...@gmail.com> AuthorDate: Thu Nov 1 13:33:30 2018 +0100 AMBARI-24833. Create cloud input/output skeleton. (#17) * AMBARI-24833. Create cloud input/output skeleton. * AMBARI-24833. Use LogFeederMode as enum by spring * AMBARI-24833. Fix review issues + add secret stores. --- .../ambari/logfeeder/plugin/input/Input.java | 14 + ambari-logsearch-logfeeder/pom.xml | 6 +- .../logfeeder/common/LogEntryParseTester.java | 11 +- .../logfeeder/common/LogFeederConstants.java | 6 +- .../ambari/logfeeder/conf/ApplicationConfig.java | 136 ++++++-- .../ambari/logfeeder/conf/LogFeederMode.java | 58 +++ .../ambari/logfeeder/conf/LogFeederProps.java | 18 + .../logfeeder/conf/LogFeederSecurityConfig.java | 53 +-- .../conf/condition/CloudStorageCondition.java | 37 ++ .../conf/condition/NonCloudStorageCondition.java | 37 ++ .../logfeeder/credential/CompositeSecretStore.java | 39 +++ .../logfeeder/credential/EnvSecretStore.java | 37 ++ .../logfeeder/credential/FileSecretStore.java | 60 ++++ .../credential/HadoopCredentialSecretStore.java | 52 +++ .../logfeeder/credential/PropertySecretStore.java | 36 ++ .../ambari/logfeeder/credential/SecretStore.java | 30 ++ .../ambari/logfeeder/filter/FilterDummy.java | 58 +++ .../logfeeder/input/InputConfigUploader.java | 26 +- .../apache/ambari/logfeeder/input/InputFile.java | 16 +- .../ambari/logfeeder/input/InputManagerImpl.java | 12 +- .../apache/ambari/logfeeder/input/InputSocket.java | 2 +- .../ambari/logfeeder/manager/BlockMerger.java | 66 ++++ .../logfeeder/manager/InputConfigHolder.java | 80 +++++ .../InputConfigManager.java} | 388 ++++++--------------- .../manager/operations/InputConfigHandler.java | 53 +++ .../impl/CloudStorageInputConfigHandler.java | 101 ++++++ .../operations/impl/DefaultInputConfigHandler.java | 166 +++++++++ .../ambari/logfeeder/metrics/StatsLogger.java | 16 +- .../ambari/logfeeder/output/OutputManagerImpl.java | 1 + .../output/cloud/CloudStorageFactory.java | 32 ++ .../logfeeder/output/cloud/CloudStorageOutput.java | 30 ++ .../output/cloud/CloudStorageOutputManager.java | 102 ++++++ .../ambari/logfeeder/output/cloud/HDFSOutput.java | 74 ++++ .../src/main/resources/logfeeder.properties | 3 + .../ambari/logfeeder/output/OutputS3FileTest.java | 3 - .../ambari/logfeeder/output/S3UploaderTest.java | 4 - .../logfeeder/output/spool/LogSpoolerTest.java | 8 - .../org/apache/ambari/logsearch/LogSearch.java | 3 +- docker/test-config/logfeeder/logfeeder.properties | 3 +- 39 files changed, 1471 insertions(+), 406 deletions(-) diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java index 6228637..9ee4533 100644 --- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java +++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java @@ -71,6 +71,7 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER private LRUCache cache; private String cacheKeyField; private boolean initDefaultFields; + private boolean cloudInput = false; private MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false); /** @@ -400,4 +401,17 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER public void setInitDefaultFields(boolean initDefaultFields) { this.initDefaultFields = initDefaultFields; } + + public boolean isCloudInput() { + return cloudInput; + } + + public void setCloudInput(boolean cloudInput) { + this.cloudInput = cloudInput; + } + + public String getCloudModeSuffix() { + String mode = isCloudInput() ? "cloud": "default"; + return "mode=" + mode; + } } diff --git a/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch-logfeeder/pom.xml index 94af44f..71cf853 100644 --- a/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch-logfeeder/pom.xml @@ -33,8 +33,8 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <spring.version>5.1.1.RELEASE</spring.version> - <spring-boot.version>2.0.6.RELEASE</spring-boot.version> + <spring.version>5.1.2.RELEASE</spring.version> + <spring-boot.version>2.1.0.RELEASE</spring-boot.version> </properties> <dependencies> @@ -96,7 +96,7 @@ <dependency> <groupId>org.easymock</groupId> <artifactId>easymock</artifactId> - <version>3.6</version> + <version>4.0.1</version> <scope>test</scope> </dependency> <dependency> diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java index b4a2a26..c4b9835 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java @@ -27,9 +27,11 @@ import java.util.Map; import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.manager.operations.impl.DefaultInputConfigHandler; import org.apache.ambari.logfeeder.input.InputFileMarker; import org.apache.ambari.logfeeder.input.InputManagerImpl; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; +import org.apache.ambari.logfeeder.manager.InputConfigManager; import org.apache.ambari.logfeeder.output.OutputManagerImpl; import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.plugin.input.InputMarker; @@ -89,8 +91,6 @@ public class LogEntryParseTester { @SuppressWarnings("unchecked") public Map<String, Object> parse() throws Exception { InputConfig inputConfig = getInputConfig(); - ConfigHandler configHandler = new ConfigHandler(null); - configHandler.setInputManager(new InputManagerImpl()); OutputManagerImpl outputManager = new OutputManagerImpl(); LogFeederProps logFeederProps = new LogFeederProps(); LogEntryCacheConfig logEntryCacheConfig = new LogEntryCacheConfig(); @@ -101,8 +101,11 @@ public class LogEntryParseTester { LogLevelFilterHandler logLevelFilterHandler = new LogLevelFilterHandler(null); logLevelFilterHandler.setLogFeederProps(logFeederProps); outputManager.setLogLevelFilterHandler(logLevelFilterHandler); - configHandler.setOutputManager(outputManager); - Input input = configHandler.getTestInput(inputConfig, logId); + DefaultInputConfigHandler configHandler = new DefaultInputConfigHandler(); + InputConfigManager inputConfigManager = new InputConfigManager( + null, new InputManagerImpl(), outputManager, configHandler,logFeederProps, true + ); + Input input = inputConfigManager.getTestInput(inputConfig, logId); input.init(logFeederProps); final Map<String, Object> result = new HashMap<>(); input.getFirstFilter().init(logFeederProps); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index 1d56924..a9790b2 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -23,7 +23,9 @@ public class LogFeederConstants { public static final String ALL = "all"; public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config"; public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN"; - + + public static final String CLOUD_PREFIX = "cl-"; + // solr fields public static final String SOLR_LEVEL = "level"; public static final String SOLR_COMPONENT = "type"; @@ -107,4 +109,6 @@ public class LogFeederConstants { public static final String SOLR_ZK_CONNECTION_STRING = "logfeeder.solr.zk_connect_string"; public static final String SOLR_URLS = "logfeeder.solr.urls"; + public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode"; + } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java index 086ad70..881b856 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,14 +20,20 @@ package org.apache.ambari.logfeeder.conf; import com.google.common.collect.Maps; import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory; +import org.apache.ambari.logfeeder.conf.condition.CloudStorageCondition; +import org.apache.ambari.logfeeder.conf.condition.NonCloudStorageCondition; import org.apache.ambari.logfeeder.container.docker.DockerContainerRegistry; import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; +import org.apache.ambari.logfeeder.manager.operations.impl.CloudStorageInputConfigHandler; import org.apache.ambari.logfeeder.input.InputConfigUploader; import org.apache.ambari.logfeeder.input.InputManagerImpl; +import org.apache.ambari.logfeeder.manager.InputConfigManager; +import org.apache.ambari.logfeeder.output.cloud.CloudStorageOutputManager; import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager; import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; -import org.apache.ambari.logfeeder.common.ConfigHandler; +import org.apache.ambari.logfeeder.manager.operations.impl.DefaultInputConfigHandler; import org.apache.ambari.logfeeder.metrics.MetricsManager; import org.apache.ambari.logfeeder.metrics.StatsLogger; import org.apache.ambari.logfeeder.output.OutputManagerImpl; @@ -44,6 +50,7 @@ import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK; import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK; import org.apache.solr.client.solrj.SolrClient; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.PropertySource; @@ -72,13 +79,42 @@ public class ApplicationConfig { } @Bean + public CheckpointManager checkpointHandler() { + return new FileCheckpointManager(); + } + + @Bean + public DockerContainerRegistry containerRegistry() { + if (logFeederProps.isDockerContainerRegistryEnabled()) { + return DockerContainerRegistry.getInstance(logFeederProps.getProperties()); + } else { + return null; + } + } + + @Bean + public MetricsManager metricsManager() { + return new MetricsManager(); + } + + // Non-cloud configurations + + @Bean + @Conditional(NonCloudStorageCondition.class) + public StatsLogger statsLogger() throws Exception { + return new StatsLogger("statsLogger", inputConfigManager()); + } + + @Bean @DependsOn({"logSearchConfigLogFeeder", "propertyConfigurer"}) - public ConfigHandler configHandler() throws Exception { - return new ConfigHandler(logSearchConfigLogFeeder()); + @Conditional(NonCloudStorageCondition.class) + public DefaultInputConfigHandler inputConfigHandler() throws Exception { + return new DefaultInputConfigHandler(); } @Bean @DependsOn("logFeederSecurityConfig") + @Conditional(NonCloudStorageCondition.class) public LogSearchConfigLogFeeder logSearchConfigLogFeeder() throws Exception { if (logFeederProps.isUseLocalConfigs()) { LogSearchConfigLogFeeder logfeederConfig = LogSearchConfigFactory.createLogSearchConfigLogFeeder( @@ -96,6 +132,7 @@ public class ApplicationConfig { } @Bean + @Conditional(NonCloudStorageCondition.class) public LogLevelFilterManager logLevelFilterManager() throws Exception { if (logFeederProps.isSolrFilterStorage()) { SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient( @@ -107,13 +144,14 @@ public class ApplicationConfig { map.put(name, logFeederProps.getProperties().getProperty(name)); } return new LogLevelFilterManagerZK(map); - } else { // no default filter manager + } else { return null; } } @Bean @DependsOn("logLevelFilterHandler") + @Conditional(NonCloudStorageCondition.class) public LogLevelFilterUpdater logLevelFilterUpdater() throws Exception { if (logFeederProps.isSolrFilterStorage() && logFeederProps.isSolrFilterMonitor()) { LogLevelFilterUpdater logLevelFilterUpdater = new LogLevelFilterUpdaterSolr( @@ -121,56 +159,92 @@ public class ApplicationConfig { 30, (LogLevelFilterManagerSolr) logLevelFilterManager(), logFeederProps.getClusterName()); logLevelFilterUpdater.start(); return logLevelFilterUpdater; - } else { // no default filter updater - return null; } - } - @Bean - public MetricsManager metricsManager() { - return new MetricsManager(); + return null; } @Bean - @DependsOn("configHandler") + @Conditional(NonCloudStorageCondition.class) public LogLevelFilterHandler logLevelFilterHandler() throws Exception { return new LogLevelFilterHandler(logSearchConfigLogFeeder()); } @Bean - @DependsOn({"configHandler", "logSearchConfigLogFeeder", "logLevelFilterHandler"}) - public InputConfigUploader inputConfigUploader() { - return new InputConfigUploader(); + @Conditional(NonCloudStorageCondition.class) + @DependsOn({"inputConfigHandler"}) + public InputConfigUploader inputConfigUploader() throws Exception { + return new InputConfigUploader("Input Config Loader", logSearchConfigLogFeeder(), + inputConfigManager(), logLevelFilterHandler()); + } + + @Bean + @DependsOn({"containerRegistry", "checkpointHandler"}) + @Conditional(NonCloudStorageCondition.class) + public InputManager inputManager() { + return new InputManagerImpl("InputIsNotReadyMonitor"); + } + + @Bean + @Conditional(NonCloudStorageCondition.class) + public OutputManager outputManager() throws Exception { + return new OutputManagerImpl(); } @Bean - @DependsOn("inputConfigUploader") - public StatsLogger statsLogger() { - return new StatsLogger(); + @Conditional(NonCloudStorageCondition.class) + public InputConfigManager inputConfigManager() throws Exception { + return new InputConfigManager(logSearchConfigLogFeeder(), inputManager(), outputManager(), + inputConfigHandler(), logFeederProps, true); } + // Cloud configurations + + @Bean(name = "cloudLogSearchLogFeederConfig") + @Conditional(CloudStorageCondition.class) + public LogSearchConfigLogFeeder cloudLogSearchLogFeederConfig() throws Exception { + return LogSearchConfigFactory.createLogSearchConfigLogFeeder( + Maps.fromProperties(logFeederProps.getProperties()), + logFeederProps.getClusterName(), + LogSearchConfigLogFeederLocal.class, false); + } + + @Bean + @Conditional(CloudStorageCondition.class) + @DependsOn({"cloudInputConfigHandler"}) + public InputConfigUploader cloudInputConfigUploader() throws Exception { + return new InputConfigUploader("Cloud Input Config Loader", cloudLogSearchLogFeederConfig(), + cloudInputConfigManager(),null); + } @Bean @DependsOn({"containerRegistry", "checkpointHandler"}) - public InputManager inputManager() { - return new InputManagerImpl(); + @Conditional(CloudStorageCondition.class) + public InputManager cloudInputManager() { + return new InputManagerImpl("CloudInputIsNotReady"); } @Bean - public OutputManager outputManager() { - return new OutputManagerImpl(); + @Conditional(CloudStorageCondition.class) + public OutputManager cloudOutputManager() throws Exception { + return new CloudStorageOutputManager(); } @Bean - public CheckpointManager checkpointHandler() { - return new FileCheckpointManager(); + @Conditional(CloudStorageCondition.class) + public InputConfigHandler cloudInputConfigHandler() { + return new CloudStorageInputConfigHandler(); } @Bean - public DockerContainerRegistry containerRegistry() { - if (logFeederProps.isDockerContainerRegistryEnabled()) { - return DockerContainerRegistry.getInstance(logFeederProps.getProperties()); - } else { - return null; - } + @Conditional(CloudStorageCondition.class) + public InputConfigManager cloudInputConfigManager() throws Exception { + return new InputConfigManager(cloudLogSearchLogFeederConfig(), cloudInputManager(), cloudOutputManager(), + cloudInputConfigHandler(), logFeederProps, false); + } + + @Bean + @Conditional(CloudStorageCondition.class) + public StatsLogger cloudStatsLogger() throws Exception { + return new StatsLogger("cloudStatsLogger", cloudInputConfigManager()); } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java new file mode 100644 index 0000000..329f066 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf; + +/** + * Global Log Feeder modes: + * <pre> + * - default: process logs based on input / filter / output JSON configurations + * - cloud: process logs based on input JSON configurations and send them directly into cloud storage (without filters) + * - hybrid: use both 2 above (together) + * </pre> + */ +public enum LogFeederMode { + DEFAULT("default"), CLOUD("cloud"), HYBRID("hybrid"); + + private String text; + + LogFeederMode(String text) { + this.text = text; + } + + public String getText() { + return this.text; + } + + public static LogFeederMode fromString(String text) { + for (LogFeederMode mode : LogFeederMode.values()) { + if (mode.text.equalsIgnoreCase(text)) { + return mode; + } + } + throw new IllegalArgumentException(String.format("String '%s' cannot be converted to LogFeederMode enum", text)); + } + + public static boolean isCloudStorage(LogFeederMode mode) { + return LogFeederMode.HYBRID.equals(mode) || LogFeederMode.CLOUD.equals(mode); + } + + public static boolean isNonCloudStorage(LogFeederMode mode) { + return LogFeederMode.HYBRID.equals(mode) || LogFeederMode.DEFAULT.equals(mode); + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index 859de8f..dc1bfd2 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -199,6 +199,16 @@ public class LogFeederProps implements LogFeederProperties { @Value("${" + LogFeederConstants.SOLR_URLS + ":}") private String solrUrlsStr; + @LogSearchPropertyDescription( + name = LogFeederConstants.CLOUD_STORAGE_MODE, + description = "Option to support sending logs to cloud storage. You can choose between supporting only cloud storage, non-cloud storage or both", + examples = {"default", "cloud", "hybrid"}, + defaultValue = "default", + sources = {LogFeederConstants.CLOUD_STORAGE_MODE} + ) + @Value("${" + LogFeederConstants.CLOUD_STORAGE_MODE + ":default}") + public LogFeederMode cloudStorageMode; + @Inject private LogEntryCacheConfig logEntryCacheConfig; @@ -352,6 +362,14 @@ public class LogFeederProps implements LogFeederProperties { this.zkFilterStorage = zkFilterStorage; } + public LogFeederMode getCloudStorageMode() { + return cloudStorageMode; + } + + public void setCloudStorageMode(LogFeederMode cloudStorageMode) { + this.cloudStorageMode = cloudStorageMode; + } + public String[] getSolrUrls() { if (StringUtils.isNotBlank(this.solrUrlsStr)) { return this.solrUrlsStr.split(","); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java index aca1109..e047f60 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java @@ -19,17 +19,18 @@ package org.apache.ambari.logfeeder.conf; import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.credential.CompositeSecretStore; +import org.apache.ambari.logfeeder.credential.FileSecretStore; +import org.apache.ambari.logfeeder.credential.HadoopCredentialSecretStore; +import org.apache.ambari.logfeeder.credential.SecretStore; import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.ArrayUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import javax.annotation.PostConstruct; import java.io.File; -import java.nio.charset.Charset; public class LogFeederSecurityConfig { @@ -142,48 +143,12 @@ public class LogFeederSecurityConfig { } private String getPassword(String propertyName, String fileName) { - String credentialStorePassword = getPasswordFromCredentialStore(propertyName); - if (credentialStorePassword != null) { - return credentialStorePassword; - } - - String filePassword = getPasswordFromFile(fileName); - if (filePassword != null) { - return filePassword; - } - - return LOGFEEDER_STORE_DEFAULT_PASSWORD; - } + SecretStore hadoopSecretStore = new HadoopCredentialSecretStore(propertyName, credentialStoreProviderPath); + SecretStore fileSecretStore = new FileSecretStore(String.join(File.separator, LOGFEEDER_CERT_DEFAULT_FOLDER, fileName), LOGFEEDER_STORE_DEFAULT_PASSWORD); + SecretStore compositeSecretStore = new CompositeSecretStore(hadoopSecretStore, fileSecretStore); - private String getPasswordFromCredentialStore(String propertyName) { - try { - if (StringUtils.isEmpty(credentialStoreProviderPath)) { - return null; - } - - org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); - config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, credentialStoreProviderPath); - char[] passwordChars = config.getPassword(propertyName); - return (ArrayUtils.isNotEmpty(passwordChars)) ? new String(passwordChars) : null; - } catch (Exception e) { - logger.warn(String.format("Could not load password %s from credential store, using default password", propertyName)); - return null; - } - } - - private String getPasswordFromFile(String fileName) { - try { - File pwdFile = new File(LOGFEEDER_CERT_DEFAULT_FOLDER, fileName); - if (!pwdFile.exists()) { - FileUtils.writeStringToFile(pwdFile, LOGFEEDER_STORE_DEFAULT_PASSWORD, Charset.defaultCharset()); - return LOGFEEDER_STORE_DEFAULT_PASSWORD; - } else { - return FileUtils.readFileToString(pwdFile, Charset.defaultCharset()); - } - } catch (Exception e) { - logger.warn("Exception occurred during read/write password file for keystore/truststore.", e); - return null; - } + char[] password = compositeSecretStore.getSecret(); + return password == null ? LOGFEEDER_STORE_DEFAULT_PASSWORD: new String(password); } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java new file mode 100644 index 0000000..3860699 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf.condition; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.conf.LogFeederMode; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; + +/** + * Global condition that checks is the application started in cloud or hybrid mode + */ +public class CloudStorageCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return LogFeederMode.isCloudStorage(LogFeederMode.fromString( + context.getEnvironment().getProperty(LogFeederConstants.CLOUD_STORAGE_MODE))); + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java new file mode 100644 index 0000000..fee0efa --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.conf.condition; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.conf.LogFeederMode; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; + +/** + * Global condition that checks is the application started in default or hybrid mode + */ +public class NonCloudStorageCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return LogFeederMode.isNonCloudStorage(LogFeederMode.fromString( + context.getEnvironment().getProperty(LogFeederConstants.CLOUD_STORAGE_MODE))); + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java new file mode 100644 index 0000000..7edaf26 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.credential; + +public class CompositeSecretStore implements SecretStore { + + private SecretStore[] secretStores; + + public CompositeSecretStore(SecretStore... secretStores) { + this.secretStores = secretStores; + } + + @Override + public char[] getSecret() { + for (SecretStore secretStore : secretStores) { + char[] secret = secretStore.getSecret(); + if (secret != null) { + return secret; + } + } + return null; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java new file mode 100644 index 0000000..5d82ee1 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.credential; + +public class EnvSecretStore implements SecretStore { + + private final String property; + + public EnvSecretStore(String property) { + this.property = property; + } + + @Override + public char[] getSecret() { + String envValue = System.getenv(property); + if (envValue != null) { + return envValue.toCharArray(); + } + return null; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java new file mode 100644 index 0000000..b9687e0 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.credential; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.nio.charset.Charset; + +public class FileSecretStore implements SecretStore { + + private static final Logger logger = LogManager.getLogger(FileSecretStore.class); + + private final String fileLocation; + private final String defaultSecret; + + public FileSecretStore(String fileLocation, String defaultSecret) { + this.fileLocation = fileLocation; + this.defaultSecret = defaultSecret; + } + + public FileSecretStore(String fileLocation) { + this.fileLocation = fileLocation; + this.defaultSecret = null; + } + + @Override + public char[] getSecret() { + try { + File pwdFile = new File(fileLocation); + if (!pwdFile.exists() && defaultSecret != null) { + FileUtils.writeStringToFile(pwdFile, defaultSecret, Charset.defaultCharset()); + return defaultSecret.toCharArray(); + } else { + return FileUtils.readFileToString(pwdFile, Charset.defaultCharset()).toCharArray(); + } + } catch (Exception e) { + logger.warn("Exception occurred during read/write password file.", e); + return null; + } + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java new file mode 100644 index 0000000..7e1237e --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.credential; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HadoopCredentialSecretStore implements SecretStore { + + private static final Logger logger = LogManager.getLogger(HadoopCredentialSecretStore.class); + private static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = "hadoop.security.credential.provider.path"; + + private final String credentialStoreProviderPath; + private final String property; + + public HadoopCredentialSecretStore(String property, String credentialStoreProviderPath) { + this.property = property; + this.credentialStoreProviderPath = credentialStoreProviderPath; + } + + @Override + public char[] getSecret() { + try { + if (StringUtils.isBlank(credentialStoreProviderPath)) { + return null; + } + org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); + config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, credentialStoreProviderPath); + return config.getPassword(property); + } catch (Exception e) { + logger.warn("Could not load password {} from credential store.", property); + return null; + } + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java new file mode 100644 index 0000000..046fe7c --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.credential; + +public class PropertySecretStore implements SecretStore { + private final String property; + + public PropertySecretStore(String property) { + this.property = property; + } + + @Override + public char[] getSecret() { + String propValue = System.getProperty(property); + if (propValue != null) { + return propValue.toCharArray(); + } + return null; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java new file mode 100644 index 0000000..c257ff4 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.credential; + +/** + * Store secrets in character array + */ +public interface SecretStore { + /** + * Gather a secret - implement the way + * @return secret character array + */ + char[] getSecret(); +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java new file mode 100644 index 0000000..e1946f8 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.filter; + +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.input.InputFile; +import org.apache.ambari.logfeeder.plugin.filter.Filter; +import org.apache.ambari.logfeeder.plugin.input.Input; +import org.apache.ambari.logfeeder.plugin.input.InputMarker; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor; +import org.apache.commons.lang3.BooleanUtils; + +/** + * Simple dummy filter, not supported by config api, create it manually + */ +public class FilterDummy extends Filter<LogFeederProps> { + + private boolean dockerEnabled = false; + + @Override + public void init(LogFeederProps logFeederProps) throws Exception { + if (logFeederProps.isDockerContainerRegistryEnabled()) { + Input input = getInput(); + if (input instanceof InputFile) { + dockerEnabled = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) input.getInputDescriptor()).getDockerEnabled(), false); + } + } + } + + @Override + public void apply(String inputStr, InputMarker inputMarker) throws Exception { + if (dockerEnabled) { + inputStr = DockerLogFilter.getLogFromDockerJson(inputStr); + } + super.apply(inputStr, inputMarker); + } + + @Override + public String getShortDescription() { + return "filter:filter=dummy"; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index 57f5b3d..283273a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -20,8 +20,8 @@ package org.apache.ambari.logfeeder.input; import com.google.common.io.Files; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; -import org.apache.ambari.logfeeder.common.ConfigHandler; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.manager.InputConfigManager; import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,19 +50,17 @@ public class InputConfigUploader extends Thread { private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json"); @Inject - private LogSearchConfigLogFeeder config; - - @Inject private LogFeederProps logFeederProps; - @Inject - private LogLevelFilterHandler logLevelFilterHandler; - - @Inject - private ConfigHandler configHandler; + private final InputConfigManager inputConfigManager; + private final LogSearchConfigLogFeeder config; + private final LogLevelFilterHandler logLevelFilterHandler; - public InputConfigUploader() { - super("Input Config Loader"); + public InputConfigUploader(String name, LogSearchConfigLogFeeder config, InputConfigManager inputConfigManager, LogLevelFilterHandler logLevelFilterHandler) { + super(name); + this.config = config; + this.inputConfigManager = inputConfigManager; + this.logLevelFilterHandler = logLevelFilterHandler; setDaemon(true); } @@ -70,7 +68,9 @@ public class InputConfigUploader extends Thread { public void init() throws Exception { this.configDir = new File(logFeederProps.getConfDir()); this.start(); - config.monitorInputConfigChanges(configHandler, logLevelFilterHandler, logFeederProps.getClusterName()); + if (config != null) { + config.monitorInputConfigChanges(inputConfigManager, logLevelFilterHandler, logFeederProps.getClusterName()); + } } @Override @@ -85,7 +85,7 @@ public class InputConfigUploader extends Thread { m.find(); String serviceName = m.group(1); String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); - if (!config.inputConfigExists(serviceName)) { + if (config != null && !config.inputConfigExists(serviceName)) { config.createInputConfig(logFeederProps.getClusterName(), serviceName, inputConfig); } filesHandled.add(inputConfigFile.getAbsolutePath()); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index b8eb5e9..64428f6 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.logfeeder.input; +import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig; import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.container.docker.DockerContainerRegistry; @@ -106,7 +107,7 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB if (dockerContainerRegistry != null) { Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap(); String logType = getLogType(); - if (metadataMap.containsKey(logType)) { + if (metadataMap.containsKey(StringUtils.removeStart(logType, LogFeederConstants.CLOUD_PREFIX))) { isReady = true; } } else { @@ -140,12 +141,12 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB public String getNameForThread() { if (filePath != null) { try { - return (getType() + "=" + (new File(filePath)).getName()); + return (getType() + "=" + (new File(filePath)).getName() + ";" + getCloudModeSuffix()); } catch (Throwable ex) { logger.warn("Couldn't get basename for filePath=" + filePath, ex); } } - return super.getNameForThread() + ":" + getType(); + return super.getNameForThread() + ":" + getType() + ";" + getCloudModeSuffix(); } @Override @@ -177,8 +178,9 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap(); String logType = getLogType(); threadGroup = new ThreadGroup("docker-parent-" + logType); - if (metadataMap.containsKey(logType)) { - Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(logType); + String replacedLogType = StringUtils.removeStart(logType, LogFeederConstants.CLOUD_PREFIX); + if (metadataMap.containsKey(replacedLogType)) { + Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(replacedLogType); for (Map.Entry<String, DockerMetadata> dockerMetadataEntry : dockerMetadataMap.entrySet()) { try { startNewChildDockerInputFileThread(dockerMetadataEntry.getValue()); @@ -198,9 +200,9 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker, InputFileB for (Map.Entry<String, List<File>> folderFileEntry : getFolderMap().entrySet()) { startNewChildInputFileThread(folderFileEntry); } - logFilePathUpdaterThread = new Thread(new LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "logfile_path_updater=" + filePath); + logFilePathUpdaterThread = new Thread(new LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), String.format("logfile_path_updater=%s;%s", filePath, getCloudModeSuffix())); logFilePathUpdaterThread.setDaemon(true); - logFileDetacherThread = new Thread(new LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), "logfile_detacher=" + filePath); + logFileDetacherThread = new Thread(new LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), String.format("logfile_detacher=%s;%s", filePath, getCloudModeSuffix())); logFileDetacherThread.setDaemon(true); logFilePathUpdaterThread.start(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java index 70e54d6..bd3e045 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java @@ -62,6 +62,16 @@ public class InputManagerImpl extends InputManager { return inputs.get(serviceName); } + private final String notReadyThreadName; + + public InputManagerImpl() { + this.notReadyThreadName = "InputIsNotReadyMonitor"; + } + + public InputManagerImpl(String notReadyThreadName) { + this.notReadyThreadName = notReadyThreadName; + } + @Override public void add(String serviceName, Input input) { List<Input> inputList = inputs.computeIfAbsent(serviceName, k -> new ArrayList<>()); @@ -130,7 +140,7 @@ public class InputManagerImpl extends InputManager { } private void startMonitorThread() { - Thread inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { + Thread inputIsReadyMonitor = new Thread(notReadyThreadName) { @Override public void run() { logger.info("Going to monitor for these missing files: " + notReadyList.toString()); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java index 965aa84..1a15395 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java @@ -123,7 +123,7 @@ public class InputSocket extends Input<LogFeederProps, InputSocketMarker, InputS @Override public String getNameForThread() { - return String.format("socket=%s-%s-%s", getLogType(), this.protocol, this.port); + return String.format("socket=%s-%s-%s;%s", getLogType(), this.protocol, this.port, getCloudModeSuffix()); } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java new file mode 100644 index 0000000..c5a9172 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.manager; + +import org.apache.ambari.logfeeder.util.LogFeederUtil; + +import java.util.HashMap; +import java.util.Map; + +/** + * Helper for merge global and input configurations + */ +public class BlockMerger { + private BlockMerger() { + } + + @SuppressWarnings("unchecked") + public static void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) { + for (String key : fromMap.keySet()) { + Object objValue = fromMap.get(key); + if (objValue == null) { + continue; + } + if (objValue instanceof Map) { + Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue); + + Map<String, Object> localFields = (Map<String, Object>) toMap.get(key); + if (localFields == null) { + localFields = new HashMap<>(); + toMap.put(key, localFields); + } + + if (globalFields != null) { + for (String fieldKey : globalFields.keySet()) { + if (!localFields.containsKey(fieldKey)) { + localFields.put(fieldKey, globalFields.get(fieldKey)); + } + } + } + } + } + + // Let's add the rest of the top level fields if missing + for (String key : fromMap.keySet()) { + if (!toMap.containsKey(key)) { + toMap.put(key, fromMap.get(key)); + } + } + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java new file mode 100644 index 0000000..35ad1bd --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.manager; + +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.plugin.manager.InputManager; +import org.apache.ambari.logfeeder.plugin.manager.OutputManager; +import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Holds common configuration/manager objects for input config managers/handlers in order to provide 1 object as input (instead of many) + */ +public class InputConfigHolder { + + private final LogFeederProps logFeederProps; + private final LogSearchConfigLogFeeder config; + private final List<InputDescriptor> inputConfigList = new ArrayList<>(); + private final List<FilterDescriptor> filterConfigList = new ArrayList<>(); + private final List<Map<String, Object>> outputConfigList = new ArrayList<>(); + + private final InputManager inputManager; + private final OutputManager outputManager; + + public InputConfigHolder(LogSearchConfigLogFeeder config, InputManager inputManager, OutputManager outputManager, LogFeederProps logFeederProps) { + this.logFeederProps = logFeederProps; + this.config = config; + this.inputManager = inputManager; + this.outputManager = outputManager; + } + + public LogFeederProps getLogFeederProps() { + return logFeederProps; + } + + public List<InputDescriptor> getInputConfigList() { + return inputConfigList; + } + + public List<FilterDescriptor> getFilterConfigList() { + return filterConfigList; + } + + public List<Map<String, Object>> getOutputConfigList() { + return outputConfigList; + } + + public InputManager getInputManager() { + return inputManager; + } + + public OutputManager getOutputManager() { + return outputManager; + } + + public LogSearchConfigLogFeeder getConfig() { + return config; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java similarity index 53% rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java index 61f726c..925bb65 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.logfeeder.common; +package org.apache.ambari.logfeeder.manager; import com.google.common.collect.Maps; import com.google.gson.reflect.TypeToken; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; import org.apache.ambari.logfeeder.input.InputSimulate; import org.apache.ambari.logfeeder.plugin.common.AliasUtil; import org.apache.ambari.logfeeder.plugin.common.MetricData; -import org.apache.ambari.logfeeder.plugin.filter.Filter; import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.plugin.manager.InputManager; import org.apache.ambari.logfeeder.plugin.manager.OutputManager; @@ -40,7 +40,6 @@ import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfi import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputDescriptorImpl; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,7 +47,6 @@ import org.springframework.core.io.ClassPathResource; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.inject.Inject; import java.io.BufferedInputStream; import java.io.File; import java.io.FileNotFoundException; @@ -56,50 +54,115 @@ import java.lang.reflect.Type; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** - * Initialize / close input and output managers and monitors input configuration changes. + * Facade class for input config operations (add / load / remove configs and start or close input monitoring) */ -public class ConfigHandler implements InputConfigMonitor { - private static final Logger logger = LogManager.getLogger(ConfigHandler.class); +public class InputConfigManager implements InputConfigMonitor { - private final LogSearchConfigLogFeeder logSearchConfig; + private Logger logger = LogManager.getLogger(InputConfigManager.class); - @Inject - private InputManager inputManager; - @Inject - private OutputManager outputManager; - @Inject - private LogFeederProps logFeederProps; + private final InputConfigHandler inputConfigHandler; + private final LogSearchConfigLogFeeder logSearchConfig; + private final LogFeederProps logFeederProps; + private final InputConfigHolder inputConfigHolder; + private final boolean loadOutput; private final Map<String, Object> globalConfigs = new HashMap<>(); private final List<String> globalConfigJsons = new ArrayList<>(); - private final List<InputDescriptor> inputConfigList = new ArrayList<>(); - private final List<FilterDescriptor> filterConfigList = new ArrayList<>(); - private final List<Map<String, Object>> outputConfigList = new ArrayList<>(); - private boolean simulateMode = false; - public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) { + public InputConfigManager(LogSearchConfigLogFeeder logSearchConfig, InputManager inputManager, + OutputManager outputManager, InputConfigHandler inputConfigHandler, + LogFeederProps logFeederProps, boolean loadOutput) { this.logSearchConfig = logSearchConfig; + this.inputConfigHandler = inputConfigHandler; + this.logFeederProps = logFeederProps; + this.loadOutput = loadOutput; + this.inputConfigHolder = new InputConfigHolder(logSearchConfig, inputManager, outputManager, logFeederProps); } @PostConstruct public void init() throws Exception { loadConfigFiles(); logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), logFeederProps.getClusterName()); - loadOutputs(); + inputConfigHandler.init(inputConfigHolder); simulateIfNeeded(); + if (loadOutput) { + loadOutputs(); + } + inputConfigHolder.getInputManager().init(); + inputConfigHolder.getOutputManager().init(); + } + + @Override + public List<String> getGlobalConfigJsons() { + return this.globalConfigJsons; + } + + @Override + public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception { + inputConfigHolder.getInputConfigList().clear(); + inputConfigHolder.getFilterConfigList().clear(); + inputConfigHolder.getInputConfigList().addAll(inputConfig.getInput()); + inputConfigHolder.getFilterConfigList().addAll(inputConfig.getFilter()); + if (simulateMode) { + InputSimulate.loadTypeToFilePath(inputConfigHolder.getInputConfigList()); + } else { + inputConfigHandler.loadInputs(serviceName, inputConfigHolder, inputConfig); + inputConfigHandler.assignInputsToOutputs(serviceName, inputConfigHolder, inputConfig); + } + inputConfigHolder.getInputManager().startInputs(serviceName); + } - inputManager.init(); - outputManager.init(); + @Override + public void removeInputs(String serviceName) { + inputConfigHolder.getInputManager().removeInputsForService(serviceName); + } + + public void cleanCheckPointFiles() { + inputConfigHolder.getInputManager().getCheckpointHandler().cleanupCheckpoints(); + } + + public void logStats() { + inputConfigHolder.getInputManager().logStats(); + inputConfigHolder.getOutputManager().logStats(); + } + + public void addMetrics(List<MetricData> metricsList) { + inputConfigHolder.getInputManager().addMetricsContainers(metricsList); + inputConfigHolder.getOutputManager().addMetricsContainers(metricsList); + } + + @PreDestroy + public void close() { + inputConfigHolder.getInputManager().close(); + inputConfigHolder.getOutputManager().close(); + inputConfigHolder.getInputManager().checkInAll(); + } + + public Input getTestInput(InputConfig inputConfig, String logId) { + for (InputDescriptor inputDescriptor : inputConfig.getInput()) { + if (inputDescriptor.getType().equals(logId)) { + inputConfigHolder.getInputConfigList().add(inputDescriptor); + break; + } + } + if (inputConfigHolder.getInputConfigList().isEmpty()) { + throw new IllegalArgumentException("Log Id " + logId + " was not found in shipper configuriaton"); + } + + for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) { + inputConfigHolder.getFilterConfigList().add(filterDescriptor); + } + inputConfigHandler.loadInputs("test", inputConfigHolder, inputConfig); + List<Input> inputList = inputConfigHolder.getInputManager().getInputList("test"); + + return inputList != null && inputList.size() == 1 ? inputList.get(0) : null; } private void loadConfigFiles() throws Exception { @@ -121,13 +184,11 @@ public class ConfigHandler implements InputConfigMonitor { private List<String> getConfigFiles() { List<String> configFiles = new ArrayList<>(); - String logFeederConfigFilesProperty = logFeederProps.getConfigFiles(); logger.info("logfeeder.config.files=" + logFeederConfigFilesProperty); if (logFeederConfigFilesProperty != null) { configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(","))); } - return configFiles; } @@ -152,57 +213,8 @@ public class ConfigHandler implements InputConfigMonitor { } } - @Override - public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception { - inputConfigList.clear(); - filterConfigList.clear(); - - inputConfigList.addAll(inputConfig.getInput()); - filterConfigList.addAll(inputConfig.getFilter()); - - if (simulateMode) { - InputSimulate.loadTypeToFilePath(inputConfigList); - } else { - loadInputs(serviceName); - loadFilters(serviceName); - assignOutputsToInputs(serviceName); - - inputManager.startInputs(serviceName); - } - } - - @Override - public void removeInputs(String serviceName) { - inputManager.removeInputsForService(serviceName); - } - - public Input getTestInput(InputConfig inputConfig, String logId) { - for (InputDescriptor inputDescriptor : inputConfig.getInput()) { - if (inputDescriptor.getType().equals(logId)) { - inputConfigList.add(inputDescriptor); - break; - } - } - if (inputConfigList.isEmpty()) { - throw new IllegalArgumentException("Log Id " + logId + " was not found in shipper configuriaton"); - } - - for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) { -// if ("grok".equals(filterDescriptor.getFilter())) { -// // Thus ensure that the log entry passed will be parsed immediately -// ((FilterGrokDescriptor)filterDescriptor).setMultilinePattern(null); -// } - filterConfigList.add(filterDescriptor); - } - loadInputs("test"); - loadFilters("test"); - List<Input> inputList = inputManager.getInputList("test"); - - return inputList != null && inputList.size() == 1 ? inputList.get(0) : null; - } - @SuppressWarnings("unchecked") - public void loadConfigs(String configData) throws Exception { + private void loadConfigs(String configData) throws Exception { Type type = new TypeToken<Map<String, Object>>() {}.getType(); Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type); @@ -215,7 +227,7 @@ public class ConfigHandler implements InputConfigMonitor { break; case "output" : List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key); - outputConfigList.addAll(outputConfig); + inputConfigHolder.getOutputConfigList().addAll(outputConfig); break; default : logger.warn("Unknown config key: " + key); @@ -223,39 +235,13 @@ public class ConfigHandler implements InputConfigMonitor { } } - @Override - public List<String> getGlobalConfigJsons() { - return globalConfigJsons; - } - - private void simulateIfNeeded() throws Exception { - int simulatedInputNumber = logFeederProps.getInputSimulateConfig().getSimulateInputNumber(); - if (simulatedInputNumber == 0) - return; - - InputConfigImpl simulateInputConfig = new InputConfigImpl(); - List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>(); - simulateInputConfig.setInput(inputConfigDescriptors); - simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>()); - for (int i = 0; i < simulatedInputNumber; i++) { - InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {}; - inputDescriptor.setSource("simulate"); - inputDescriptor.setRowtype("service"); - inputDescriptor.setAddFields(new HashMap<String, String>()); - inputConfigDescriptors.add(inputDescriptor); - } - - loadInputConfigs("Simulation", simulateInputConfig); - - simulateMode = true; - } - private void loadOutputs() { - for (Map<String, Object> map : outputConfigList) { + for (Map<String, Object> map : inputConfigHolder.getOutputConfigList()) { if (map == null) { + logger.warn("Output map is empty. Skipping..."); continue; } - mergeBlocks(globalConfigs, map); + BlockMerger.mergeBlocks(globalConfigs, map); String value = (String) map.get("destination"); if (StringUtils.isEmpty(value)) { @@ -274,182 +260,32 @@ public class ConfigHandler implements InputConfigMonitor { // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input if (output.isEnabled()) { output.logConfigs(); - outputManager.add(output); + inputConfigHolder.getOutputManager().add(output); } else { logger.info("Output is disabled. So ignoring it. " + output.getShortDescription()); } } } - private void loadInputs(String serviceName) { - for (InputDescriptor inputDescriptor : inputConfigList) { - if (inputDescriptor == null) { - continue; - } - - String source = (String) inputDescriptor.getSource(); - if (StringUtils.isEmpty(source)) { - logger.error("Input block doesn't have source element"); - continue; - } - Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT); - if (input == null) { - logger.error("Input object could not be found"); - continue; - } - input.setType(source); - input.setLogType(inputDescriptor.getType()); - input.loadConfig(inputDescriptor); - - if (input.isEnabled()) { - input.setOutputManager(outputManager); - input.setInputManager(inputManager); - inputManager.add(serviceName, input); - input.logConfigs(); - } else { - logger.info("Input is disabled. So ignoring it. " + input.getShortDescription()); - } - } - } - - private void loadFilters(String serviceName) { - sortFilters(); - - List<Input> toRemoveInputList = new ArrayList<Input>(); - for (Input input : inputManager.getInputList(serviceName)) { - for (FilterDescriptor filterDescriptor : filterConfigList) { - if (filterDescriptor == null) { - continue; - } - if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { - logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled"); - continue; - } - if (!input.isFilterRequired(filterDescriptor)) { - logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription()); - continue; - } - - String value = filterDescriptor.getFilter(); - if (StringUtils.isEmpty(value)) { - logger.error("Filter block doesn't have filter element"); - continue; - } - Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER); - if (filter == null) { - logger.error("Filter object could not be found"); - continue; - } - filter.loadConfig(filterDescriptor); - filter.setInput(input); - - filter.setOutputManager(outputManager); - input.addFilter(filter); - filter.logConfigs(); - } - - if (input.getFirstFilter() == null) { - toRemoveInputList.add(input); - } - } - - for (Input toRemoveInput : toRemoveInputList) { - logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription()); - inputManager.removeInput(toRemoveInput); - } - } - - private void sortFilters() { - Collections.sort(filterConfigList, (o1, o2) -> { - Integer o1Sort = o1.getSortOrder(); - Integer o2Sort = o2.getSortOrder(); - if (o1Sort == null || o2Sort == null) { - return 0; - } - - return o1Sort - o2Sort; - }); - } - - private void assignOutputsToInputs(String serviceName) { - Set<Output> usedOutputSet = new HashSet<Output>(); - for (Input input : inputManager.getInputList(serviceName)) { - for (Output output : outputManager.getOutputs()) { - if (input.isOutputRequired(output)) { - usedOutputSet.add(output); - input.addOutput(output); - } - } - } - - // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager - for (Output output : InputSimulate.getSimulateOutputs()) { - output.setLogSearchConfig(logSearchConfig); - outputManager.add(output); - usedOutputSet.add(output); - } - } - - @SuppressWarnings("unchecked") - private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) { - for (String key : fromMap.keySet()) { - Object objValue = fromMap.get(key); - if (objValue == null) { - continue; - } - if (objValue instanceof Map) { - Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue); - - Map<String, Object> localFields = (Map<String, Object>) toMap.get(key); - if (localFields == null) { - localFields = new HashMap<String, Object>(); - toMap.put(key, localFields); - } - - if (globalFields != null) { - for (String fieldKey : globalFields.keySet()) { - if (!localFields.containsKey(fieldKey)) { - localFields.put(fieldKey, globalFields.get(fieldKey)); - } - } - } - } - } + private void simulateIfNeeded() throws Exception { + int simulatedInputNumber = inputConfigHolder.getLogFeederProps().getInputSimulateConfig().getSimulateInputNumber(); + if (simulatedInputNumber == 0) + return; - // Let's add the rest of the top level fields if missing - for (String key : fromMap.keySet()) { - if (!toMap.containsKey(key)) { - toMap.put(key, fromMap.get(key)); - } + InputConfigImpl simulateInputConfig = new InputConfigImpl(); + List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>(); + simulateInputConfig.setInput(inputConfigDescriptors); + simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>()); + for (int i = 0; i < simulatedInputNumber; i++) { + InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {}; + inputDescriptor.setSource("simulate"); + inputDescriptor.setRowtype("service"); + inputDescriptor.setAddFields(new HashMap<String, String>()); + inputConfigDescriptors.add(inputDescriptor); } - } - - public void cleanCheckPointFiles() { - inputManager.getCheckpointHandler().cleanupCheckpoints(); - } - - public void logStats() { - inputManager.logStats(); - outputManager.logStats(); - } - public void addMetrics(List<MetricData> metricsList) { - inputManager.addMetricsContainers(metricsList); - outputManager.addMetricsContainers(metricsList); - } - - @PreDestroy - public void close() { - inputManager.close(); - outputManager.close(); - inputManager.checkInAll(); - } - - public void setInputManager(InputManager inputManager) { - this.inputManager = inputManager; - } + loadInputConfigs("Simulation", simulateInputConfig); - public void setOutputManager(OutputManager outputManager) { - this.outputManager = outputManager; + simulateMode = true; } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java new file mode 100644 index 0000000..2e80d0d --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.manager.operations; + +import org.apache.ambari.logfeeder.manager.InputConfigHolder;; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; + +/** + * Holds operations regarding input config handling. (init configs, load input configs and assign inputs to outputs) + */ +public interface InputConfigHandler { + + /** + * Initialization step before loading inputs/filter/outputs + * @param inputConfigHolder object that holds input/filter/output configuration details + * @throws Exception error during initialization + */ + void init(InputConfigHolder inputConfigHolder) throws Exception; + + /** + * Step during input/filter configurations initialization + * @param serviceName group of input configs + * @param inputConfigHolder object that holds input/filter/output configuration details + * @param config input/filter config object + */ + void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config); + + /** + * Assign inputs to outputs - after inputs/filters/outputs are loaded + * @param serviceName group of input configs + * @param inputConfigHolder object that holds input/filter/output configuration details + * @param config input/filter config object + * @throws Exception error during input/output assignment + */ + void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) throws Exception; + +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java new file mode 100644 index 0000000..deb3a91 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.manager.operations.impl; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.conf.LogFeederMode; +import org.apache.ambari.logfeeder.filter.FilterDummy; +import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; +import org.apache.ambari.logfeeder.manager.InputConfigHolder; +import org.apache.ambari.logfeeder.plugin.common.AliasUtil; +import org.apache.ambari.logfeeder.plugin.input.Input; +import org.apache.ambari.logfeeder.plugin.output.Output; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * Holds input/filter/output operations in cloud Log Feeder mode. + */ +public class CloudStorageInputConfigHandler implements InputConfigHandler { + + private static final Logger logger = LogManager.getLogger(CloudStorageInputConfigHandler.class); + + @Override + public void init(InputConfigHolder inputConfigHolder) { + logger.info("Call init of cloud input config handler..."); + } + + @Override + public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) { + for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { + if (inputDescriptor == null) { + logger.warn("Input descriptor is smpty. Skipping..."); + continue; + } + LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode(); + if (inputDescriptor instanceof InputSocketDescriptor && LogFeederMode.HYBRID.equals(mode)) { + logger.info("Socket input is skipped (won't be sent to cloud storage) in hybrid mode"); + continue; + } + String source = inputDescriptor.getSource(); + if (StringUtils.isEmpty(source)) { + logger.error("Input block doesn't have source element"); + continue; + } + Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT); + if (input == null) { + logger.error("Input object could not be found"); + continue; + } + input.setType(source); + input.setLogType(LogFeederConstants.CLOUD_PREFIX + inputDescriptor.getType()); + input.loadConfig(inputDescriptor); + FilterDummy filter = new FilterDummy(); + filter.setOutputManager(inputConfigHolder.getOutputManager()); + input.setFirstFilter(filter); + input.setCloudInput(true); + + if (input.isEnabled()) { + input.setOutputManager(inputConfigHolder.getOutputManager()); + input.setInputManager(inputConfigHolder.getInputManager()); + inputConfigHolder.getInputManager().add(serviceName, input); + logger.info("New cloud input object registered for service '{}': '{}'", serviceName, input.getLogType()); + input.logConfigs(); + } else { + logger.info("Input is disabled. So ignoring it. " + input.getShortDescription()); + } + } + } + + @Override + public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) { + for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { + List<Output> outputs = inputConfigHolder.getOutputManager().getOutputs(); + for (Output output : outputs) { + input.addOutput(output); + } + } + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java new file mode 100644 index 0000000..44da631 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.manager.operations.impl; + +import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; +import org.apache.ambari.logfeeder.input.InputSimulate; +import org.apache.ambari.logfeeder.manager.InputConfigHolder; +import org.apache.ambari.logfeeder.plugin.common.AliasUtil; +import org.apache.ambari.logfeeder.plugin.filter.Filter; +import org.apache.ambari.logfeeder.plugin.input.Input; +import org.apache.ambari.logfeeder.plugin.output.Output; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Holds input/filter/output operations in default Log Feeder mode. + */ +public class DefaultInputConfigHandler implements InputConfigHandler { + + private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class); + + @Override + public void init(InputConfigHolder inputConfigHolder) throws Exception { + } + + @Override + public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) { + loadInputs(serviceName, inputConfigHolder); + loadFilters(serviceName, inputConfigHolder); + } + + @Override + public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) { + for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { + for (Output output : inputConfigHolder.getOutputManager().getOutputs()) { + if (input.isOutputRequired(output)) { + input.addOutput(output); + } + } + } + + // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager + for (Output output : InputSimulate.getSimulateOutputs()) { + output.setLogSearchConfig(inputConfigHolder.getConfig()); + inputConfigHolder.getOutputManager().add(output); + } + } + + private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) { + for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { + if (inputDescriptor == null) { + logger.warn("Input descriptor is smpty. Skipping..."); + continue; + } + + String source = inputDescriptor.getSource(); + if (StringUtils.isEmpty(source)) { + logger.error("Input block doesn't have source element"); + continue; + } + Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT); + if (input == null) { + logger.error("Input object could not be found"); + continue; + } + input.setType(source); + input.setLogType(inputDescriptor.getType()); + input.loadConfig(inputDescriptor); + + if (input.isEnabled()) { + input.setOutputManager(inputConfigHolder.getOutputManager()); + input.setInputManager(inputConfigHolder.getInputManager()); + inputConfigHolder.getInputManager().add(serviceName, input); + logger.info("New input object registered for service '{}': '{}'", serviceName, input.getLogType()); + input.logConfigs(); + } else { + logger.info("Input is disabled. So ignoring it. " + input.getShortDescription()); + } + } + } + + private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) { + sortFilters(inputConfigHolder); + + List<Input> toRemoveInputList = new ArrayList<>(); + for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { + for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) { + if (filterDescriptor == null) { + logger.warn("Filter descriptor is smpty. Skipping..."); + continue; + } + if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { + logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled"); + continue; + } + if (!input.isFilterRequired(filterDescriptor)) { + logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription()); + continue; + } + + String value = filterDescriptor.getFilter(); + if (StringUtils.isEmpty(value)) { + logger.error("Filter block doesn't have filter element"); + continue; + } + Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER); + if (filter == null) { + logger.error("Filter object could not be found"); + continue; + } + filter.loadConfig(filterDescriptor); + filter.setInput(input); + + filter.setOutputManager(inputConfigHolder.getOutputManager()); + input.addFilter(filter); + filter.logConfigs(); + } + + if (input.getFirstFilter() == null) { + toRemoveInputList.add(input); + } + } + + for (Input toRemoveInput : toRemoveInputList) { + logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription()); + inputConfigHolder.getInputManager().removeInput(toRemoveInput); + } + } + + private void sortFilters(InputConfigHolder inputConfigHolder) { + Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> { + Integer o1Sort = o1.getSortOrder(); + Integer o2Sort = o2.getSortOrder(); + if (o1Sort == null || o2Sort == null) { + return 0; + } + + return o1Sort - o2Sort; + }); + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java index e72fd43..bc1510a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java @@ -18,7 +18,7 @@ */ package org.apache.ambari.logfeeder.metrics; -import org.apache.ambari.logfeeder.common.ConfigHandler; +import org.apache.ambari.logfeeder.manager.InputConfigManager; import org.apache.ambari.logfeeder.plugin.common.MetricData; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,15 +36,15 @@ public class StatsLogger extends Thread { private long lastCheckPointCleanedMS = 0; - @Inject - private ConfigHandler configHandler; + private final InputConfigManager inputConfigManager; @Inject private MetricsManager metricsManager; - public StatsLogger() { - super("statLogger"); + public StatsLogger(String name, InputConfigManager inputConfigManager) { + super(name); setDaemon(true); + this.inputConfigManager = inputConfigManager; } @PostConstruct @@ -68,16 +68,16 @@ public class StatsLogger extends Thread { if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) { lastCheckPointCleanedMS = System.currentTimeMillis(); - configHandler.cleanCheckPointFiles(); + inputConfigManager.cleanCheckPointFiles(); } } } private void logStats() { - configHandler.logStats(); + inputConfigManager.logStats(); if (metricsManager.isMetricsEnabled()) { List<MetricData> metricsList = new ArrayList<MetricData>(); - configHandler.addMetrics(metricsList); + inputConfigManager.addMetrics(metricsList); metricsManager.useMetrics(metricsList); } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java index 68db96a..afe1c0a 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java @@ -72,6 +72,7 @@ public class OutputManagerImpl extends OutputManager { @SuppressWarnings("unchecked") @Override public void init() throws Exception { + logger.info("Called init with default output manager."); for (Output output : outputs) { output.init(logFeederProps); } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java new file mode 100644 index 0000000..871ae93 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output.cloud; + +import org.apache.ambari.logfeeder.conf.LogFeederProps; + +/** + * Class for creating the right cloud storage outputs based on global Log Feeder configurations + * TODO !!! + */ +public class CloudStorageFactory { + + public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) { + return new HDFSOutput(); + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java new file mode 100644 index 0000000..561b141 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output.cloud; + +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.plugin.input.InputMarker; +import org.apache.ambari.logfeeder.plugin.output.Output; + +/** + * Class to handle common operations for cloud storage outputs + * TODO !!! + */ +public abstract class CloudStorageOutput extends Output<LogFeederProps, InputMarker> { +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java new file mode 100644 index 0000000..4994eb7 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output.cloud; + +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.plugin.common.MetricData; +import org.apache.ambari.logfeeder.plugin.input.InputMarker; +import org.apache.ambari.logfeeder.plugin.manager.OutputManager; +import org.apache.ambari.logfeeder.plugin.output.Output; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.inject.Inject; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Handle output operations for sending cloud inputs to a cloud storage destination + * TODO !!! + */ +public class CloudStorageOutputManager extends OutputManager { + + private static final Logger logger = LogManager.getLogger(CloudStorageOutputManager.class); + + @Inject + private LogFeederProps logFeederProps; + + private CloudStorageOutput storageOutput = null; + + private List<Output> outputList = new ArrayList<>(); + + @Override + public void write(Map<String, Object> jsonObj, InputMarker marker) { + // TODO: make sense to implement this if we will support filters before calling cloud outputs + } + + @Override + public void write(String line, InputMarker marker) { + logger.info("Output: {}", line); + try { + storageOutput.write(line, marker); + } catch (Exception e) { + + } + } + + @Override + public void copyFile(File file, InputMarker marker) { + + } + + @Override + public void add(Output output) { + this.outputList.add(output); + } + + @Override + public List<Output> getOutputs() { + return this.outputList; + } + + @Override + public void init() throws Exception { + logger.info("Called init with cloud storage output manager."); + storageOutput = CloudStorageFactory.createCloudStorageOutput(logFeederProps); + storageOutput.init(logFeederProps); + add(storageOutput); + } + + @Override + public void close() { + + } + + @Override + public void logStats() { + + } + + @Override + public void addMetricsContainers(List<MetricData> metricsList) { + + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java new file mode 100644 index 0000000..24edb41 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output.cloud; + +import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.plugin.input.InputMarker; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; + +/** + * HDFS cloud storage output (on-prem) + * TODO !!! + */ +public class HDFSOutput extends CloudStorageOutput { + + private Logger logger = LogManager.getLogger(HDFSOutput.class); + + @Override + public String getOutputType() { + return null; + } + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) throws Exception { + } + + @Override + public void write(String line, InputMarker inputMarker) throws Exception { + inputMarker.getInput().checkIn(inputMarker); + } + + @Override + public Long getPendingCount() { + return null; + } + + @Override + public String getWriteBytesMetricName() { + return null; + } + + @Override + public void init(LogFeederProps logFeederProperties) throws Exception { + logger.info("Initialize on-prem HDFS output"); + } + + @Override + public String getShortDescription() { + return null; + } + + @Override + public String getStatMetricName() { + return null; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index 0fb1058..06c95f3 100644 --- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -38,3 +38,6 @@ logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp #logfeeder.configs.local.enabled=true #logfeeder.configs.filter.solr.enabled=true +#logfeeder.docker.registry.enabled=true +logfeeder.cloud.storage.mode=default +#logfeeder.cloud.storage.mode=cloud diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java index 63799e3..6674be1 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java @@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.output; import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -64,7 +63,6 @@ public class OutputS3FileTest { } } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldRolloverWhenSufficientSizeIsReached() throws Exception { @@ -83,7 +81,6 @@ public class OutputS3FileTest { assertTrue(outputS3File.shouldRollover(logSpoolerContext)); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception { String thresholdSize = Long.toString(15 * 1024 * 1024L); diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java index facc77f..e070545 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java @@ -37,7 +37,6 @@ public class S3UploaderTest { public static final String ACCESS_KEY_VALUE = "accessKeyValue"; public static final String SECRET_KEY_VALUE = "secretKeyValue"; - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldUploadToS3ToRightBucket() { File fileToUpload = mock(File.class); @@ -66,7 +65,6 @@ public class S3UploaderTest { assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldCleanupLocalFilesOnSuccessfulUpload() { File fileToUpload = mock(File.class); @@ -96,7 +94,6 @@ public class S3UploaderTest { verify(compressedFile); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldNotCleanupUncompressedFileIfNotRequired() { File fileToUpload = mock(File.class); @@ -124,7 +121,6 @@ public class S3UploaderTest { verify(compressedFile); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldExpandVariablesInPath() { File fileToUpload = mock(File.class); diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java index 4a7b9b0..2cfe9ff 100644 --- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java +++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java @@ -22,7 +22,6 @@ import org.easymock.EasyMockRule; import org.easymock.LogicalOperator; import org.easymock.Mock; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -56,7 +55,6 @@ public class LogSpoolerTest { spoolDirectory = testFolder.getRoot().getAbsolutePath(); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldSpoolEventToFile() { final PrintWriter spoolWriter = mock(PrintWriter.class); @@ -93,7 +91,6 @@ public class LogSpoolerTest { return mockFile; } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldIncrementSpooledEventsCount() { @@ -126,7 +123,6 @@ public class LogSpoolerTest { verify(rolloverCondition); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldCloseCurrentSpoolFileOnRollOver() { final PrintWriter spoolWriter = mock(PrintWriter.class); @@ -161,7 +157,6 @@ public class LogSpoolerTest { verify(spoolWriter); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldReinitializeFileOnRollover() { final PrintWriter spoolWriter1 = mock(PrintWriter.class); @@ -217,7 +212,6 @@ public class LogSpoolerTest { verify(spoolWriter1, spoolWriter2, rolloverCondition); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldCallRolloverHandlerOnRollover() { final PrintWriter spoolWriter = mock(PrintWriter.class); @@ -255,7 +249,6 @@ public class LogSpoolerTest { // Rollover twice - the second rollover should work if the "rolloverInProgress" // flag is being reset correctly. Third file expectations being setup due // to auto-initialization. - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldResetRolloverInProgressFlag() { final PrintWriter spoolWriter1 = mock(PrintWriter.class); @@ -329,7 +322,6 @@ public class LogSpoolerTest { verify(spoolWriter1, spoolWriter2, rolloverCondition); } - @Ignore("Until EasyMock 3.7 upgrade - waiting for release") @Test public void shouldNotRolloverZeroLengthFiles() { final PrintWriter spoolWriter = mock(PrintWriter.class); diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java index 92c2b32..54ebf45 100644 --- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java +++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java @@ -19,6 +19,7 @@ package org.apache.ambari.logsearch; import org.springframework.boot.Banner; +import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.rest.RepositoryRestMvcAutoConfiguration; import org.springframework.boot.autoconfigure.data.solr.SolrRepositoriesAutoConfiguration; @@ -47,7 +48,7 @@ public class LogSearch { new SpringApplicationBuilder(LogSearch.class) .bannerMode(Banner.Mode.OFF) .listeners(new ApplicationPidFileWriter(pidFile)) - .web(true) + .web(WebApplicationType.SERVLET) .run(args); } diff --git a/docker/test-config/logfeeder/logfeeder.properties b/docker/test-config/logfeeder/logfeeder.properties index 8371170..ffdb061 100644 --- a/docker/test-config/logfeeder/logfeeder.properties +++ b/docker/test-config/logfeeder/logfeeder.properties @@ -34,4 +34,5 @@ logfeeder.solr.core.config.name=history #logfeeder.solr.urls=http://solr:8983/solr #logfeeder.configs.local.enabled=true #logfeeder.configs.filter.solr.enabled=true -#logfeeder.configs.filter.zk.enabled=true \ No newline at end of file +#logfeeder.configs.filter.zk.enabled=true +#logfeeder.cloud.storage.mode=hybrid \ No newline at end of file