oleewere closed pull request #26: AMBARI-24833. Support for cloud logs to using filters + JSON output URL: https://github.com/apache/ambari-logsearch/pull/26
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java index f6cb519b95..12af637d8c 100644 --- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java +++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java @@ -84,12 +84,7 @@ public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonito File[] inputConfigFiles = new File(configDir).listFiles(inputConfigFileFilter); if (inputConfigFiles != null) { for (File inputConfigFile : inputConfigFiles) { - String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath())); - Matcher m = serviceNamePattern.matcher(inputConfigFile.getName()); - m.find(); - String serviceName = m.group(1); - JsonElement inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode); - inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class)); + tryLoadingInputConfig(inputConfigMonitor, parser, globalConfigNode, inputConfigFile); } } final FileSystem fs = FileSystems.getDefault(); @@ -100,6 +95,41 @@ public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonito executorService.submit(updater); } + private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, JsonParser parser, JsonArray globalConfigNode, File inputConfigFile) throws Exception { + // note: that will try to solve a intermittent issue when the input config json is a null string (during file generation), that process will re-try to process the files a few times + int tries = 0; + while(true) { + tries++; + Matcher m = serviceNamePattern.matcher(inputConfigFile.getName()); + m.find(); + String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath())); + String serviceName = m.group(1); + JsonElement inputConfigJson = null; + logger.info("Trying to load '{}' service input config from input file '{}'", serviceName, inputConfigFile.getAbsolutePath()); + try { + inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode); + } catch (Exception e) { + final String errorMessage; + if (tries < 3) { + errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries)); + logger.error(errorMessage, e); + try { + Thread.sleep(2000); + } catch (Exception ex) { + // skip + } + continue; + } else { + errorMessage = String.format("Cannot parse input config: %s, after %s tries. Will skip to processing it", inputConfig, String.valueOf(tries)); + logger.error(errorMessage, e); + break; + } + } + inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class)); + break; + } + } + @Override public void close() throws IOException { } diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java index fd08e07fed..0975c395fa 100644 --- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java +++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java @@ -48,6 +48,7 @@ public LogLevelFilterManagerZK(Map<String, String> properties) throws Exception { this.client = LogSearchConfigZKHelper.createZKClient(properties); + this.client.start(); this.serverCache = new TreeCache(client, "/"); this.aclList = LogSearchConfigZKHelper.getAcls(properties); this.gson = LogSearchConfigZKHelper.createGson(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index b5fffa829b..f9ef32d688 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -112,6 +112,7 @@ public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket"; public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap"; public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client"; + public static final String CLOUD_STORAGE_USE_FILTERS = "logfeeder.cloud.storage.use.filters"; public static final String CLOUD_STORAGE_CUSTOM_FS = "logfeeder.cloud.storage.custom.fs"; public static final String CLOUD_STORAGE_BASE_PATH = "logfeeder.cloud.storage.base.path"; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index 83f10e497c..f2eb6c741e 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -289,6 +289,16 @@ @Value("${"+ LogFeederConstants.HDFS_USER + ":}") private String logfeederHdfsUser; + @LogSearchPropertyDescription( + name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS, + description = "Use filters for inputs (with filters the output format will be JSON)", + examples = {"true"}, + defaultValue = "false", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_FILTERS + ":false}") + private boolean cloudStorageUseFilters; + @Inject private LogEntryCacheConfig logEntryCacheConfig; @@ -522,6 +532,14 @@ public void setCustomFs(String customFs) { this.customFs = customFs; } + public boolean isCloudStorageUseFilters() { + return cloudStorageUseFilters; + } + + public void setCloudStorageUseFilters(boolean cloudStorageUseFilters) { + this.cloudStorageUseFilters = cloudStorageUseFilters; + } + public String getCloudBasePath() { return cloudBasePath; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java new file mode 100644 index 0000000000..31bfd0d24b --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.manager.operations.impl; + +import org.apache.ambari.logfeeder.manager.InputConfigHolder; +import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler; +import org.apache.ambari.logfeeder.plugin.common.AliasUtil; +import org.apache.ambari.logfeeder.plugin.filter.Filter; +import org.apache.ambari.logfeeder.plugin.input.Input; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Holds common operations for input config handlers + */ +public abstract class AbstractInputConfigHandler implements InputConfigHandler { + + private static final Logger logger = LogManager.getLogger(AbstractInputConfigHandler.class); + + protected void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) { + sortFilters(inputConfigHolder); + + List<Input> toRemoveInputList = new ArrayList<>(); + for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { + for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) { + if (filterDescriptor == null) { + logger.warn("Filter descriptor is smpty. Skipping..."); + continue; + } + if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { + logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled"); + continue; + } + if (!input.isFilterRequired(filterDescriptor)) { + logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription()); + continue; + } + + String value = filterDescriptor.getFilter(); + if (StringUtils.isEmpty(value)) { + logger.error("Filter block doesn't have filter element"); + continue; + } + Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER); + if (filter == null) { + logger.error("Filter object could not be found"); + continue; + } + filter.loadConfig(filterDescriptor); + filter.setInput(input); + + filter.setOutputManager(inputConfigHolder.getOutputManager()); + input.addFilter(filter); + filter.logConfigs(); + } + + if (input.getFirstFilter() == null) { + toRemoveInputList.add(input); + } + } + + for (Input toRemoveInput : toRemoveInputList) { + logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription()); + inputConfigHolder.getInputManager().removeInput(toRemoveInput); + } + } + + protected void sortFilters(InputConfigHolder inputConfigHolder) { + Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> { + Integer o1Sort = o1.getSortOrder(); + Integer o2Sort = o2.getSortOrder(); + if (o1Sort == null || o2Sort == null) { + return 0; + } + + return o1Sort - o2Sort; + }); + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java index deb3a91663..ac10b2d667 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java @@ -38,7 +38,7 @@ /** * Holds input/filter/output operations in cloud Log Feeder mode. */ -public class CloudStorageInputConfigHandler implements InputConfigHandler { +public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler { private static final Logger logger = LogManager.getLogger(CloudStorageInputConfigHandler.class); @@ -49,6 +49,7 @@ public void init(InputConfigHolder inputConfigHolder) { @Override public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) { + final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters(); for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { if (inputDescriptor == null) { logger.warn("Input descriptor is smpty. Skipping..."); @@ -72,9 +73,11 @@ public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, input.setType(source); input.setLogType(LogFeederConstants.CLOUD_PREFIX + inputDescriptor.getType()); input.loadConfig(inputDescriptor); - FilterDummy filter = new FilterDummy(); - filter.setOutputManager(inputConfigHolder.getOutputManager()); - input.setFirstFilter(filter); + if (!useFilters) { + FilterDummy filter = new FilterDummy(); + filter.setOutputManager(inputConfigHolder.getOutputManager()); + input.setFirstFilter(filter); + } input.setCloudInput(true); if (input.isEnabled()) { @@ -87,6 +90,9 @@ public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, logger.info("Input is disabled. So ignoring it. " + input.getShortDescription()); } } + if (useFilters) { + loadFilters(serviceName, inputConfigHolder); + } } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java index 44da6319e4..dd0fe3e23e 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java @@ -40,7 +40,7 @@ /** * Holds input/filter/output operations in default Log Feeder mode. */ -public class DefaultInputConfigHandler implements InputConfigHandler { +public class DefaultInputConfigHandler extends AbstractInputConfigHandler { private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class); @@ -103,64 +103,4 @@ private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) } } } - - private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) { - sortFilters(inputConfigHolder); - - List<Input> toRemoveInputList = new ArrayList<>(); - for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { - for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) { - if (filterDescriptor == null) { - logger.warn("Filter descriptor is smpty. Skipping..."); - continue; - } - if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { - logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled"); - continue; - } - if (!input.isFilterRequired(filterDescriptor)) { - logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription()); - continue; - } - - String value = filterDescriptor.getFilter(); - if (StringUtils.isEmpty(value)) { - logger.error("Filter block doesn't have filter element"); - continue; - } - Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER); - if (filter == null) { - logger.error("Filter object could not be found"); - continue; - } - filter.loadConfig(filterDescriptor); - filter.setInput(input); - - filter.setOutputManager(inputConfigHolder.getOutputManager()); - input.addFilter(filter); - filter.logConfigs(); - } - - if (input.getFirstFilter() == null) { - toRemoveInputList.add(input); - } - } - - for (Input toRemoveInput : toRemoveInputList) { - logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription()); - inputConfigHolder.getInputManager().removeInput(toRemoveInput); - } - } - - private void sortFilters(InputConfigHolder inputConfigHolder) { - Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> { - Integer o1Sort = o1.getSortOrder(); - Integer o2Sort = o2.getSortOrder(); - if (o1Sort == null || o2Sort == null) { - return 0; - } - - return o1Sort - o2Sort; - }); - } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java new file mode 100644 index 0000000000..bd9e3df213 --- /dev/null +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output; + +import com.google.common.hash.Hashing; +import org.apache.ambari.logfeeder.plugin.common.MetricData; +import org.apache.ambari.logfeeder.plugin.input.Input; +import org.apache.ambari.logfeeder.plugin.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Utility class for fill output with other fields + */ +public class OutputLineEnricher { + + private static final Logger logger = LogManager.getLogger(OutputLineEnricher.class); + + private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1 + + public void enrichFields(final Map<String, Object> jsonObj, final InputMarker inputMarker, final MetricData messageTruncateMetric) { + Input input = inputMarker.getInput(); + // Update the block with the context fields + for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) { + if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) { + jsonObj.put(entry.getKey(), entry.getValue()); + } + } + // TODO: Ideally most of the overrides should be configurable + LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true); + if (input.isUseEventMD5() || input.isGenEventMD5()) { + String prefix = ""; + Object logtimeObj = jsonObj.get("logtime"); + if (logtimeObj != null) { + if (logtimeObj instanceof Date) { + prefix = "" + ((Date) logtimeObj).getTime(); + } else { + prefix = logtimeObj.toString(); + } + } + byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes(); + long eventMD5 = Hashing.md5().hashBytes(bytes).asLong(); + if (input.isGenEventMD5()) { + jsonObj.put("event_md5", prefix + Long.toString(eventMD5)); + } + if (input.isUseEventMD5()) { + jsonObj.put("id", prefix + Long.toString(eventMD5)); + } + } + jsonObj.computeIfAbsent("event_count", k -> 1); + if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) { + jsonObj.put("group", input.getInputDescriptor().getGroup()); + } + if (inputMarker.getAllProperties().containsKey("line_number") && + (Integer) inputMarker.getAllProperties().get("line_number") > 0) { + jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number")); + } + if (jsonObj.containsKey("log_message")) { + // TODO: Let's check size only for log_message for now + String logMessage = (String) jsonObj.get("log_message"); + logMessage = truncateLongLogMessage(messageTruncateMetric, jsonObj, input, logMessage); + jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong()); + } + } + + @SuppressWarnings("unchecked") + private String truncateLongLogMessage(MetricData messageTruncateMetric, Map<String, Object> jsonObj, Input input, String logMessage) { + if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { + messageTruncateMetric.value++; + String logMessageKey = input.getOutputManager().getClass().getSimpleName() + "_MESSAGESIZE"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length + + ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 200 characters=" + + StringUtils.abbreviate(logMessage, 200), null, logger, Level.WARN); + logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE); + jsonObj.put("log_message", logMessage); + List<String> tagsList = (List<String>) jsonObj.get("tags"); + if (tagsList == null) { + tagsList = new ArrayList<>(); + jsonObj.put("tags", tagsList); + } + tagsList.add("error_message_truncated"); + } + return logMessage; + } +} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java index afe1c0af95..b4c862d997 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java @@ -59,7 +59,8 @@ @Inject private LogFeederProps logFeederProps; - private OutputLineFilter outputLineFilter = new OutputLineFilter(); + private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher(); + private final OutputLineFilter outputLineFilter = new OutputLineFilter(); public List<Output> getOutputs() { return outputs; @@ -80,57 +81,12 @@ public void init() throws Exception { @SuppressWarnings("unchecked") public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { - Input input = inputMarker.getInput(); - - // Update the block with the context fields - for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) { - if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) { - jsonObj.put(entry.getKey(), entry.getValue()); - } - } - - // TODO: Ideally most of the overrides should be configurable - - LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true); - jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN); - - if (input.isUseEventMD5() || input.isGenEventMD5()) { - String prefix = ""; - Object logtimeObj = jsonObj.get("logtime"); - if (logtimeObj != null) { - if (logtimeObj instanceof Date) { - prefix = "" + ((Date) logtimeObj).getTime(); - } else { - prefix = logtimeObj.toString(); - } - } - - - byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes(); - long eventMD5 = Hashing.md5().hashBytes(bytes).asLong(); - if (input.isGenEventMD5()) { - jsonObj.put("event_md5", prefix + Long.toString(eventMD5)); - } - if (input.isUseEventMD5()) { - jsonObj.put("id", prefix + Long.toString(eventMD5)); - } - } - jsonObj.put("seq_num", docCounter++); - jsonObj.computeIfAbsent("event_count", k -> 1); - if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) { - jsonObj.put("group", input.getInputDescriptor().getGroup()); - } - if (inputMarker.getAllProperties().containsKey("line_number") && - (Integer) inputMarker.getAllProperties().get("line_number") > 0) { - jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number")); - } - if (jsonObj.containsKey("log_message")) { - // TODO: Let's check size only for log_message for now - String logMessage = (String) jsonObj.get("log_message"); - logMessage = truncateLongLogMessage(jsonObj, input, logMessage); - jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong()); + if (docCounter == Long.MIN_VALUE) { + docCounter = 1; } + outputLineEnricher.enrichFields(jsonObj, inputMarker, messageTruncateMetric); + Input input = inputMarker.getInput(); List<String> defaultLogLevels = getDefaultLogLevels(input); if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels) && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) { @@ -158,26 +114,6 @@ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { } } - @SuppressWarnings("unchecked") - private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) { - if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { - messageTruncateMetric.value++; - String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length + - ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" + - StringUtils.abbreviate(logMessage, 100), null, logger, Level.WARN); - logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE); - jsonObj.put("log_message", logMessage); - List<String> tagsList = (List<String>) jsonObj.get("tags"); - if (tagsList == null) { - tagsList = new ArrayList<String>(); - jsonObj.put("tags", tagsList); - } - tagsList.add("error_message_truncated"); - } - return logMessage; - } - @SuppressWarnings("unchecked") public void write(String jsonBlock, InputMarker inputMarker) { List<String> defaultLogLevels = getDefaultLogLevels(inputMarker.getInput()); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java index 8201051655..0cfdbcc1e2 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java @@ -48,8 +48,11 @@ private static final String ARCHIVED_FOLDER = "archived"; private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz"; private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log"; + private static final String JSON_DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json.gz"; + private static final String JSON_DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json"; public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) { + boolean useJsonFormat = logFeederProps.isCloudStorageUseFilters(); String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, ""); String uniqueThreadName = input.getThread().getName(); Configuration config = loggerContext.getConfiguration(); @@ -59,8 +62,15 @@ public static Logger createLogger(Input input, LoggerContext loggerContext, LogF String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath(); boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip(); - String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX; - String fileName = String.join(File.separator, activeLogDir, type + ".log"); + final String archiveFilePattern; + if (useJsonFormat) { + archiveFilePattern = useGzip ? JSON_DATE_PATTERN_SUFFIX_GZ : JSON_DATE_PATTERN_SUFFIX; + } else { + archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX; + } + + String logSuffix = useJsonFormat ? ".json" : ".log"; + String fileName = String.join(File.separator, activeLogDir, type + logSuffix); String filePattern = String.join(File.separator, archiveLogDir, type + archiveFilePattern); PatternLayout layout = PatternLayout.newBuilder() .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java index 16b7e5503d..9be30a066e 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java @@ -18,7 +18,10 @@ */ package org.apache.ambari.logfeeder.output.cloud; +import org.apache.ambari.logfeeder.common.IdGeneratorHelper; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.output.OutputLineEnricher; +import org.apache.ambari.logfeeder.output.OutputLineFilter; import org.apache.ambari.logfeeder.plugin.common.MetricData; import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.plugin.input.InputMarker; @@ -33,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * Handle output operations for sending cloud inputs to a cloud storage destination @@ -47,10 +51,25 @@ private CloudStorageOutput storageOutput = null; private List<Output> outputList = new ArrayList<>(); + private final AtomicBoolean useFilters = new AtomicBoolean(false); + + private final MetricData messageTruncateMetric = new MetricData(null, false); + private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher(); + private final OutputLineFilter outputLineFilter = new OutputLineFilter(); @Override public void write(Map<String, Object> jsonObj, InputMarker marker) { - write(LogFeederUtil.getGson().toJson(jsonObj), marker); + if (useFilters.get()) { + outputLineEnricher.enrichFields(jsonObj, marker, messageTruncateMetric); + if (!outputLineFilter.apply(jsonObj, marker.getInput())) { + if (jsonObj.get("id") == null) { + jsonObj.put("id", IdGeneratorHelper.generateUUID(jsonObj, storageOutput.getIdFields())); + } + write(LogFeederUtil.getGson().toJson(jsonObj), marker); + } + } else { + write(LogFeederUtil.getGson().toJson(jsonObj), marker); + } } @Override @@ -82,6 +101,12 @@ public void init() throws Exception { storageOutput = new CloudStorageOutput(logFeederProps); storageOutput.init(logFeederProps); add(storageOutput); + useFilters.set(logFeederProps.isCloudStorageUseFilters()); + if (useFilters.get()) { + logger.info("Using filters are enabled for cloud log outputs"); + } else { + logger.info("Using filters are disabled for cloud log outputs"); + } } @Override diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java index b76f441eea..af9326aed4 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java @@ -76,7 +76,7 @@ void doUpload() { try { final String archiveLogDir = String.join(File.separator, logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, "archived"); if (new File(archiveLogDir).exists()) { - String[] extensions = {"log", "gz"}; + String[] extensions = {"log", "json", "gz"}; Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true); if (filesToUpload.isEmpty()) { logger.debug("Not found any files to upload."); diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index c7ea335b25..45c05f39cf 100644 --- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -47,6 +47,7 @@ logfeeder.cloud.storage.uploader.interval.seconds=1 logfeeder.cloud.storage.upload.on.shutdown=true logfeeder.cloud.storage.base.path=/apps/logfeeder logfeeder.cloud.storage.use.hdfs.client=true +logfeeder.cloud.storage.use.filters=false logfeeder.cloud.storage.bucket=logfeeder logfeeder.cloud.storage.bucket.bootstrap=true ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services