oleewere closed pull request #26: AMBARI-24833. Support for cloud logs to using 
filters + JSON output
URL: https://github.com/apache/ambari-logsearch/pull/26
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
 
b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index f6cb519b95..12af637d8c 100644
--- 
a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ 
b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -84,12 +84,7 @@ public void monitorInputConfigChanges(final 
InputConfigMonitor inputConfigMonito
     File[] inputConfigFiles = new 
File(configDir).listFiles(inputConfigFileFilter);
     if (inputConfigFiles != null) {
       for (File inputConfigFile : inputConfigFiles) {
-        String inputConfig = new 
String(Files.readAllBytes(inputConfigFile.toPath()));
-        Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
-        m.find();
-        String serviceName = m.group(1);
-        JsonElement inputConfigJson = 
JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, 
globalConfigNode);
-        inputConfigMonitor.loadInputConfigs(serviceName, 
InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+        tryLoadingInputConfig(inputConfigMonitor, parser, globalConfigNode, 
inputConfigFile);
       }
     }
     final FileSystem fs = FileSystems.getDefault();
@@ -100,6 +95,41 @@ public void monitorInputConfigChanges(final 
InputConfigMonitor inputConfigMonito
     executorService.submit(updater);
   }
 
+  private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, 
JsonParser parser, JsonArray globalConfigNode, File inputConfigFile) throws 
Exception {
+    // note: that will try to solve a intermittent issue when the input config 
json is a null string (during file generation), that process will re-try to 
process the files a few times
+    int tries = 0;
+    while(true) {
+      tries++;
+      Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
+      m.find();
+      String inputConfig = new 
String(Files.readAllBytes(inputConfigFile.toPath()));
+      String serviceName = m.group(1);
+      JsonElement inputConfigJson = null;
+      logger.info("Trying to load '{}' service input config from input file 
'{}'", serviceName, inputConfigFile.getAbsolutePath());
+      try {
+        inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, 
inputConfig, globalConfigNode);
+      } catch (Exception e) {
+        final String errorMessage;
+        if (tries < 3) {
+          errorMessage = String.format("Cannot parse input config: %s, will 
retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+          logger.error(errorMessage, e);
+          try {
+            Thread.sleep(2000);
+          } catch (Exception ex) {
+            // skip
+          }
+          continue;
+        } else {
+          errorMessage = String.format("Cannot parse input config: %s, after 
%s tries. Will skip to processing it", inputConfig, String.valueOf(tries));
+          logger.error(errorMessage, e);
+          break;
+        }
+      }
+      inputConfigMonitor.loadInputConfigs(serviceName, 
InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+      break;
+    }
+  }
+
   @Override
   public void close() throws IOException {
   }
diff --git 
a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
 
b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
index fd08e07fed..0975c395fa 100644
--- 
a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
+++ 
b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
@@ -48,6 +48,7 @@
 
   public LogLevelFilterManagerZK(Map<String, String> properties) throws 
Exception {
     this.client = LogSearchConfigZKHelper.createZKClient(properties);
+    this.client.start();
     this.serverCache = new TreeCache(client, "/");
     this.aclList = LogSearchConfigZKHelper.getAcls(properties);
     this.gson = LogSearchConfigZKHelper.createGson();
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index b5fffa829b..f9ef32d688 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -112,6 +112,7 @@
   public static final String CLOUD_STORAGE_BUCKET = 
"logfeeder.cloud.storage.bucket";
   public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = 
"logfeeder.cloud.storage.bucket.bootstrap";
   public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = 
"logfeeder.cloud.storage.use.hdfs.client";
+  public static final String CLOUD_STORAGE_USE_FILTERS = 
"logfeeder.cloud.storage.use.filters";
   public static final String CLOUD_STORAGE_CUSTOM_FS = 
"logfeeder.cloud.storage.custom.fs";
   public static final String CLOUD_STORAGE_BASE_PATH = 
"logfeeder.cloud.storage.base.path";
 
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 83f10e497c..f2eb6c741e 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -289,6 +289,16 @@
   @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
   private String logfeederHdfsUser;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
+    description = "Use filters for inputs (with filters the output format will 
be JSON)",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_FILTERS + ":false}")
+  private boolean cloudStorageUseFilters;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -522,6 +532,14 @@ public void setCustomFs(String customFs) {
     this.customFs = customFs;
   }
 
+  public boolean isCloudStorageUseFilters() {
+    return cloudStorageUseFilters;
+  }
+
+  public void setCloudStorageUseFilters(boolean cloudStorageUseFilters) {
+    this.cloudStorageUseFilters = cloudStorageUseFilters;
+  }
+
   public String getCloudBasePath() {
     return cloudBasePath;
   }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
new file mode 100644
index 0000000000..31bfd0d24b
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations.impl;
+
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Holds common operations for input config handlers
+ */
+public abstract class AbstractInputConfigHandler implements InputConfigHandler 
{
+
+  private static final Logger logger = 
LogManager.getLogger(AbstractInputConfigHandler.class);
+
+  protected void loadFilters(String serviceName, InputConfigHolder 
inputConfigHolder) {
+    sortFilters(inputConfigHolder);
+
+    List<Input> toRemoveInputList = new ArrayList<>();
+    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      for (FilterDescriptor filterDescriptor : 
inputConfigHolder.getFilterConfigList()) {
+        if (filterDescriptor == null) {
+          logger.warn("Filter descriptor is smpty. Skipping...");
+          continue;
+        }
+        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
+          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
because it is disabled");
+          continue;
+        }
+        if (!input.isFilterRequired(filterDescriptor)) {
+          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
for input " + input.getShortDescription());
+          continue;
+        }
+
+        String value = filterDescriptor.getFilter();
+        if (StringUtils.isEmpty(value)) {
+          logger.error("Filter block doesn't have filter element");
+          continue;
+        }
+        Filter filter = (Filter) AliasUtil.getClassInstance(value, 
AliasUtil.AliasType.FILTER);
+        if (filter == null) {
+          logger.error("Filter object could not be found");
+          continue;
+        }
+        filter.loadConfig(filterDescriptor);
+        filter.setInput(input);
+
+        filter.setOutputManager(inputConfigHolder.getOutputManager());
+        input.addFilter(filter);
+        filter.logConfigs();
+      }
+
+      if (input.getFirstFilter() == null) {
+        toRemoveInputList.add(input);
+      }
+    }
+
+    for (Input toRemoveInput : toRemoveInputList) {
+      logger.warn("There are no filters, we will ignore this input. " + 
toRemoveInput.getShortDescription());
+      inputConfigHolder.getInputManager().removeInput(toRemoveInput);
+    }
+  }
+
+  protected void sortFilters(InputConfigHolder inputConfigHolder) {
+    Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
+      Integer o1Sort = o1.getSortOrder();
+      Integer o2Sort = o2.getSortOrder();
+      if (o1Sort == null || o2Sort == null) {
+        return 0;
+      }
+
+      return o1Sort - o2Sort;
+    });
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index deb3a91663..ac10b2d667 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -38,7 +38,7 @@
 /**
  * Holds input/filter/output operations in cloud Log Feeder mode.
  */
-public class CloudStorageInputConfigHandler implements InputConfigHandler {
+public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler 
{
 
   private static final Logger logger = 
LogManager.getLogger(CloudStorageInputConfigHandler.class);
 
@@ -49,6 +49,7 @@ public void init(InputConfigHolder inputConfigHolder) {
 
   @Override
   public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig inputConfig) {
+    final boolean useFilters = 
inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
     for (InputDescriptor inputDescriptor : 
inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
         logger.warn("Input descriptor is smpty. Skipping...");
@@ -72,9 +73,11 @@ public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder,
       input.setType(source);
       input.setLogType(LogFeederConstants.CLOUD_PREFIX + 
inputDescriptor.getType());
       input.loadConfig(inputDescriptor);
-      FilterDummy filter = new FilterDummy();
-      filter.setOutputManager(inputConfigHolder.getOutputManager());
-      input.setFirstFilter(filter);
+      if (!useFilters) {
+        FilterDummy filter = new FilterDummy();
+        filter.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setFirstFilter(filter);
+      }
       input.setCloudInput(true);
 
       if (input.isEnabled()) {
@@ -87,6 +90,9 @@ public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder,
         logger.info("Input is disabled. So ignoring it. " + 
input.getShortDescription());
       }
     }
+    if (useFilters) {
+      loadFilters(serviceName, inputConfigHolder);
+    }
   }
 
   @Override
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index 44da6319e4..dd0fe3e23e 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -40,7 +40,7 @@
 /**
  * Holds input/filter/output operations in default Log Feeder mode.
  */
-public class DefaultInputConfigHandler implements InputConfigHandler {
+public class DefaultInputConfigHandler extends AbstractInputConfigHandler {
 
   private static final Logger logger = 
LogManager.getLogger(DefaultInputConfigHandler.class);
 
@@ -103,64 +103,4 @@ private void loadInputs(String serviceName, 
InputConfigHolder inputConfigHolder)
       }
     }
   }
-
-  private void loadFilters(String serviceName, InputConfigHolder 
inputConfigHolder) {
-    sortFilters(inputConfigHolder);
-
-    List<Input> toRemoveInputList = new ArrayList<>();
-    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
-      for (FilterDescriptor filterDescriptor : 
inputConfigHolder.getFilterConfigList()) {
-        if (filterDescriptor == null) {
-          logger.warn("Filter descriptor is smpty. Skipping...");
-          continue;
-        }
-        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
because it is disabled");
-          continue;
-        }
-        if (!input.isFilterRequired(filterDescriptor)) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
for input " + input.getShortDescription());
-          continue;
-        }
-
-        String value = filterDescriptor.getFilter();
-        if (StringUtils.isEmpty(value)) {
-          logger.error("Filter block doesn't have filter element");
-          continue;
-        }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, 
AliasUtil.AliasType.FILTER);
-        if (filter == null) {
-          logger.error("Filter object could not be found");
-          continue;
-        }
-        filter.loadConfig(filterDescriptor);
-        filter.setInput(input);
-
-        filter.setOutputManager(inputConfigHolder.getOutputManager());
-        input.addFilter(filter);
-        filter.logConfigs();
-      }
-
-      if (input.getFirstFilter() == null) {
-        toRemoveInputList.add(input);
-      }
-    }
-
-    for (Input toRemoveInput : toRemoveInputList) {
-      logger.warn("There are no filters, we will ignore this input. " + 
toRemoveInput.getShortDescription());
-      inputConfigHolder.getInputManager().removeInput(toRemoveInput);
-    }
-  }
-
-  private void sortFilters(InputConfigHolder inputConfigHolder) {
-    Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
-      Integer o1Sort = o1.getSortOrder();
-      Integer o2Sort = o2.getSortOrder();
-      if (o1Sort == null || o2Sort == null) {
-        return 0;
-      }
-
-      return o1Sort - o2Sort;
-    });
-  }
 }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
new file mode 100644
index 0000000000..bd9e3df213
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import com.google.common.hash.Hashing;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for fill output with other fields
+ */
+public class OutputLineEnricher {
+
+  private static final Logger logger = 
LogManager.getLogger(OutputLineEnricher.class);
+
+  private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
+
+  public void enrichFields(final Map<String, Object> jsonObj, final 
InputMarker inputMarker, final MetricData messageTruncateMetric) {
+    Input input = inputMarker.getInput();
+    // Update the block with the context fields
+    for (Map.Entry<String, String> entry : 
input.getInputDescriptor().getAddFields().entrySet()) {
+      if (jsonObj.get(entry.getKey()) == null || 
entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) 
{
+        jsonObj.put(entry.getKey(), entry.getValue());
+      }
+    }
+    // TODO: Ideally most of the overrides should be configurable
+    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
+    if (input.isUseEventMD5() || input.isGenEventMD5()) {
+      String prefix = "";
+      Object logtimeObj = jsonObj.get("logtime");
+      if (logtimeObj != null) {
+        if (logtimeObj instanceof Date) {
+          prefix = "" + ((Date) logtimeObj).getTime();
+        } else {
+          prefix = logtimeObj.toString();
+        }
+      }
+      byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
+      long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
+      if (input.isGenEventMD5()) {
+        jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
+      }
+      if (input.isUseEventMD5()) {
+        jsonObj.put("id", prefix + Long.toString(eventMD5));
+      }
+    }
+    jsonObj.computeIfAbsent("event_count", k -> 1);
+    if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
+      jsonObj.put("group", input.getInputDescriptor().getGroup());
+    }
+    if (inputMarker.getAllProperties().containsKey("line_number") &&
+      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
+      jsonObj.put("logfile_line_number", 
inputMarker.getAllProperties().get("line_number"));
+    }
+    if (jsonObj.containsKey("log_message")) {
+      // TODO: Let's check size only for log_message for now
+      String logMessage = (String) jsonObj.get("log_message");
+      logMessage = truncateLongLogMessage(messageTruncateMetric, jsonObj, 
input, logMessage);
+      jsonObj.put("message_md5", "" + 
Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private String truncateLongLogMessage(MetricData messageTruncateMetric, 
Map<String, Object> jsonObj, Input input, String logMessage) {
+    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+      messageTruncateMetric.value++;
+      String logMessageKey = 
input.getOutputManager().getClass().getSimpleName() + "_MESSAGESIZE";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too 
big. size=" + logMessage.getBytes().length +
+        ", input=" + input.getShortDescription() + ". Truncating to " + 
MAX_OUTPUT_SIZE + ", first upto 200 characters=" +
+        StringUtils.abbreviate(logMessage, 200), null, logger, Level.WARN);
+      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
+      jsonObj.put("log_message", logMessage);
+      List<String> tagsList = (List<String>) jsonObj.get("tags");
+      if (tagsList == null) {
+        tagsList = new ArrayList<>();
+        jsonObj.put("tags", tagsList);
+      }
+      tagsList.add("error_message_truncated");
+    }
+    return logMessage;
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index afe1c0af95..b4c862d997 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -59,7 +59,8 @@
   @Inject
   private LogFeederProps logFeederProps;
 
-  private OutputLineFilter outputLineFilter = new OutputLineFilter();
+  private final OutputLineEnricher outputLineEnricher = new 
OutputLineEnricher();
+  private final OutputLineFilter outputLineFilter = new OutputLineFilter();
 
   public List<Output> getOutputs() {
     return outputs;
@@ -80,57 +81,12 @@ public void init() throws Exception {
 
   @SuppressWarnings("unchecked")
   public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    Input input = inputMarker.getInput();
-
-    // Update the block with the context fields
-    for (Map.Entry<String, String> entry : 
input.getInputDescriptor().getAddFields().entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null || 
entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) 
{
-        jsonObj.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // TODO: Ideally most of the overrides should be configurable
-
-    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
-    jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-
-    if (input.isUseEventMD5() || input.isGenEventMD5()) {
-      String prefix = "";
-      Object logtimeObj = jsonObj.get("logtime");
-      if (logtimeObj != null) {
-        if (logtimeObj instanceof Date) {
-          prefix = "" + ((Date) logtimeObj).getTime();
-        } else {
-          prefix = logtimeObj.toString();
-        }
-      }
-
-
-      byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
-      long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
-      if (input.isGenEventMD5()) {
-        jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
-      }
-      if (input.isUseEventMD5()) {
-        jsonObj.put("id", prefix + Long.toString(eventMD5));
-      }
-    }
-
     jsonObj.put("seq_num", docCounter++);
-    jsonObj.computeIfAbsent("event_count", k -> 1);
-    if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
-      jsonObj.put("group", input.getInputDescriptor().getGroup());
-    }
-    if (inputMarker.getAllProperties().containsKey("line_number") &&
-      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
-      jsonObj.put("logfile_line_number", 
inputMarker.getAllProperties().get("line_number"));
-    }
-    if (jsonObj.containsKey("log_message")) {
-      // TODO: Let's check size only for log_message for now
-      String logMessage = (String) jsonObj.get("log_message");
-      logMessage = truncateLongLogMessage(jsonObj, input, logMessage);
-      jsonObj.put("message_md5", "" + 
Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+    if (docCounter == Long.MIN_VALUE) {
+      docCounter = 1;
     }
+    outputLineEnricher.enrichFields(jsonObj, inputMarker, 
messageTruncateMetric);
+    Input input = inputMarker.getInput();
     List<String> defaultLogLevels = getDefaultLogLevels(input);
     if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels)
       && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) {
@@ -158,26 +114,6 @@ public void write(Map<String, Object> jsonObj, InputMarker 
inputMarker) {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private String truncateLongLogMessage(Map<String, Object> jsonObj, Input 
input, String logMessage) {
-    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
-      messageTruncateMetric.value++;
-      String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too 
big. size=" + logMessage.getBytes().length +
-        ", input=" + input.getShortDescription() + ". Truncating to " + 
MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
-        StringUtils.abbreviate(logMessage, 100), null, logger, Level.WARN);
-      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
-      jsonObj.put("log_message", logMessage);
-      List<String> tagsList = (List<String>) jsonObj.get("tags");
-      if (tagsList == null) {
-        tagsList = new ArrayList<String>();
-        jsonObj.put("tags", tagsList);
-      }
-      tagsList.add("error_message_truncated");
-    }
-    return logMessage;
-  }
-
   @SuppressWarnings("unchecked")
   public void write(String jsonBlock, InputMarker inputMarker) {
     List<String> defaultLogLevels = 
getDefaultLogLevels(inputMarker.getInput());
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
index 8201051655..0cfdbcc1e2 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -48,8 +48,11 @@
   private static final String ARCHIVED_FOLDER = "archived";
   private static final String DATE_PATTERN_SUFFIX_GZ = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz";
   private static final String DATE_PATTERN_SUFFIX = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log";
+  private static final String JSON_DATE_PATTERN_SUFFIX_GZ = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json.gz";
+  private static final String JSON_DATE_PATTERN_SUFFIX = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json";
 
   public static Logger createLogger(Input input, LoggerContext loggerContext, 
LogFeederProps logFeederProps) {
+    boolean useJsonFormat = logFeederProps.isCloudStorageUseFilters();
     String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, 
"");
     String uniqueThreadName = input.getThread().getName();
     Configuration config = loggerContext.getConfiguration();
@@ -59,8 +62,15 @@ public static Logger createLogger(Input input, LoggerContext 
loggerContext, LogF
     String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, 
type).toFile().getAbsolutePath();
 
     boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
-    String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : 
DATE_PATTERN_SUFFIX;
-    String fileName = String.join(File.separator, activeLogDir, type + ".log");
+    final String archiveFilePattern;
+    if (useJsonFormat) {
+      archiveFilePattern = useGzip ? JSON_DATE_PATTERN_SUFFIX_GZ : 
JSON_DATE_PATTERN_SUFFIX;
+    } else {
+      archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : 
DATE_PATTERN_SUFFIX;
+    }
+
+    String logSuffix = useJsonFormat ? ".json" : ".log";
+    String fileName = String.join(File.separator, activeLogDir, type + 
logSuffix);
     String filePattern = String.join(File.separator, archiveLogDir, type + 
archiveFilePattern);
     PatternLayout layout = PatternLayout.newBuilder()
       .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build();
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
index 16b7e5503d..9be30a066e 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -18,7 +18,10 @@
  */
 package org.apache.ambari.logfeeder.output.cloud;
 
+import org.apache.ambari.logfeeder.common.IdGeneratorHelper;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.OutputLineEnricher;
+import org.apache.ambari.logfeeder.output.OutputLineFilter;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -33,6 +36,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Handle output operations for sending cloud inputs to a cloud storage 
destination
@@ -47,10 +51,25 @@
   private CloudStorageOutput storageOutput = null;
 
   private List<Output> outputList = new ArrayList<>();
+  private final AtomicBoolean useFilters = new AtomicBoolean(false);
+
+  private final MetricData messageTruncateMetric = new MetricData(null, false);
+  private final OutputLineEnricher outputLineEnricher = new 
OutputLineEnricher();
+  private final OutputLineFilter outputLineFilter = new OutputLineFilter();
 
   @Override
   public void write(Map<String, Object> jsonObj, InputMarker marker) {
-    write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+    if (useFilters.get()) {
+      outputLineEnricher.enrichFields(jsonObj, marker, messageTruncateMetric);
+      if (!outputLineFilter.apply(jsonObj, marker.getInput())) {
+        if (jsonObj.get("id") == null) {
+          jsonObj.put("id", IdGeneratorHelper.generateUUID(jsonObj, 
storageOutput.getIdFields()));
+        }
+        write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+      }
+    } else {
+      write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+    }
   }
 
   @Override
@@ -82,6 +101,12 @@ public void init() throws Exception {
     storageOutput = new CloudStorageOutput(logFeederProps);
     storageOutput.init(logFeederProps);
     add(storageOutput);
+    useFilters.set(logFeederProps.isCloudStorageUseFilters());
+    if (useFilters.get()) {
+      logger.info("Using filters are enabled for cloud log outputs");
+    } else {
+      logger.info("Using filters are disabled for cloud log outputs");
+    }
   }
 
   @Override
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index b76f441eea..af9326aed4 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -76,7 +76,7 @@ void doUpload() {
     try {
       final String archiveLogDir = String.join(File.separator, 
logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, 
"archived");
       if (new File(archiveLogDir).exists()) {
-        String[] extensions = {"log", "gz"};
+        String[] extensions = {"log", "json", "gz"};
         Collection<File> filesToUpload = FileUtils.listFiles(new 
File(archiveLogDir), extensions, true);
         if (filesToUpload.isEmpty()) {
           logger.debug("Not found any files to upload.");
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties 
b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index c7ea335b25..45c05f39cf 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -47,6 +47,7 @@ logfeeder.cloud.storage.uploader.interval.seconds=1
 logfeeder.cloud.storage.upload.on.shutdown=true
 logfeeder.cloud.storage.base.path=/apps/logfeeder
 logfeeder.cloud.storage.use.hdfs.client=true
+logfeeder.cloud.storage.use.filters=false
 
 logfeeder.cloud.storage.bucket=logfeeder
 logfeeder.cloud.storage.bucket.bootstrap=true


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to