This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git


The following commit(s) were added to refs/heads/master by this push:
     new 72b3dd6  AMBARI-24833. Support for cloud logs to using filters + JSON 
output (#26)
72b3dd6 is described below

commit 72b3dd6d39d233db5b02536b3c6a3215971f6cd6
Author: Olivér Szabó <oleew...@gmail.com>
AuthorDate: Mon Nov 19 10:32:15 2018 +0100

    AMBARI-24833. Support for cloud logs to using filters + JSON output (#26)
    
    * AMBARI-24833. Support for cloud logs to using filters + JSON output
    
    * AMBARI-24833. Do not filter anything if filters are not enabled
    
    * AMBARI-24833. Fix intermittent issues.
    
    * AMBARI-24833. Edit comment
---
 .../local/LogSearchConfigLogFeederLocal.java       |  42 ++++++--
 .../config/zookeeper/LogLevelFilterManagerZK.java  |   1 +
 .../logfeeder/common/LogFeederConstants.java       |   1 +
 .../ambari/logfeeder/conf/LogFeederProps.java      |  18 ++++
 ...andler.java => AbstractInputConfigHandler.java} |  84 ++--------------
 .../impl/CloudStorageInputConfigHandler.java       |  14 ++-
 .../operations/impl/DefaultInputConfigHandler.java |  62 +-----------
 .../logfeeder/output/OutputLineEnricher.java       | 109 +++++++++++++++++++++
 .../ambari/logfeeder/output/OutputManagerImpl.java |  76 ++------------
 .../output/cloud/CloudStorageLoggerFactory.java    |  14 ++-
 .../output/cloud/CloudStorageOutputManager.java    |  27 ++++-
 .../output/cloud/CloudStorageUploader.java         |   2 +-
 .../src/main/resources/logfeeder.properties        |   1 +
 13 files changed, 232 insertions(+), 219 deletions(-)

diff --git 
a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
 
b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index f6cb519..12af637 100644
--- 
a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ 
b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -84,12 +84,7 @@ public class LogSearchConfigLogFeederLocal extends 
LogSearchConfigLocal implemen
     File[] inputConfigFiles = new 
File(configDir).listFiles(inputConfigFileFilter);
     if (inputConfigFiles != null) {
       for (File inputConfigFile : inputConfigFiles) {
-        String inputConfig = new 
String(Files.readAllBytes(inputConfigFile.toPath()));
-        Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
-        m.find();
-        String serviceName = m.group(1);
-        JsonElement inputConfigJson = 
JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, 
globalConfigNode);
-        inputConfigMonitor.loadInputConfigs(serviceName, 
InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+        tryLoadingInputConfig(inputConfigMonitor, parser, globalConfigNode, 
inputConfigFile);
       }
     }
     final FileSystem fs = FileSystems.getDefault();
@@ -100,6 +95,41 @@ public class LogSearchConfigLogFeederLocal extends 
LogSearchConfigLocal implemen
     executorService.submit(updater);
   }
 
+  private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, 
JsonParser parser, JsonArray globalConfigNode, File inputConfigFile) throws 
Exception {
+    // note: that will try to solve a intermittent issue when the input config 
json is a null string (during file generation), that process will re-try to 
process the files a few times
+    int tries = 0;
+    while(true) {
+      tries++;
+      Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
+      m.find();
+      String inputConfig = new 
String(Files.readAllBytes(inputConfigFile.toPath()));
+      String serviceName = m.group(1);
+      JsonElement inputConfigJson = null;
+      logger.info("Trying to load '{}' service input config from input file 
'{}'", serviceName, inputConfigFile.getAbsolutePath());
+      try {
+        inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, 
inputConfig, globalConfigNode);
+      } catch (Exception e) {
+        final String errorMessage;
+        if (tries < 3) {
+          errorMessage = String.format("Cannot parse input config: %s, will 
retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+          logger.error(errorMessage, e);
+          try {
+            Thread.sleep(2000);
+          } catch (Exception ex) {
+            // skip
+          }
+          continue;
+        } else {
+          errorMessage = String.format("Cannot parse input config: %s, after 
%s tries. Will skip to processing it", inputConfig, String.valueOf(tries));
+          logger.error(errorMessage, e);
+          break;
+        }
+      }
+      inputConfigMonitor.loadInputConfigs(serviceName, 
InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+      break;
+    }
+  }
+
   @Override
   public void close() throws IOException {
   }
diff --git 
a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
 
b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
index fd08e07..0975c39 100644
--- 
a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
+++ 
b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
@@ -48,6 +48,7 @@ public class LogLevelFilterManagerZK implements 
LogLevelFilterManager {
 
   public LogLevelFilterManagerZK(Map<String, String> properties) throws 
Exception {
     this.client = LogSearchConfigZKHelper.createZKClient(properties);
+    this.client.start();
     this.serverCache = new TreeCache(client, "/");
     this.aclList = LogSearchConfigZKHelper.getAcls(properties);
     this.gson = LogSearchConfigZKHelper.createGson();
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index b5fffa8..f9ef32d 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -112,6 +112,7 @@ public class LogFeederConstants {
   public static final String CLOUD_STORAGE_BUCKET = 
"logfeeder.cloud.storage.bucket";
   public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = 
"logfeeder.cloud.storage.bucket.bootstrap";
   public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = 
"logfeeder.cloud.storage.use.hdfs.client";
+  public static final String CLOUD_STORAGE_USE_FILTERS = 
"logfeeder.cloud.storage.use.filters";
   public static final String CLOUD_STORAGE_CUSTOM_FS = 
"logfeeder.cloud.storage.custom.fs";
   public static final String CLOUD_STORAGE_BASE_PATH = 
"logfeeder.cloud.storage.base.path";
 
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 83f10e4..f2eb6c7 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -289,6 +289,16 @@ public class LogFeederProps implements LogFeederProperties 
{
   @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
   private String logfeederHdfsUser;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
+    description = "Use filters for inputs (with filters the output format will 
be JSON)",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_FILTERS + ":false}")
+  private boolean cloudStorageUseFilters;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -522,6 +532,14 @@ public class LogFeederProps implements LogFeederProperties 
{
     this.customFs = customFs;
   }
 
+  public boolean isCloudStorageUseFilters() {
+    return cloudStorageUseFilters;
+  }
+
+  public void setCloudStorageUseFilters(boolean cloudStorageUseFilters) {
+    this.cloudStorageUseFilters = cloudStorageUseFilters;
+  }
+
   public String getCloudBasePath() {
     return cloudBasePath;
   }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
similarity index 53%
copy from 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
copy to 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 44da631..31bfd0d 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,93 +18,29 @@
  */
 package org.apache.ambari.logfeeder.manager.operations.impl;
 
-import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
-import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
 import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
 import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
-import org.apache.ambari.logfeeder.plugin.output.Output;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
-import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 /**
- * Holds input/filter/output operations in default Log Feeder mode.
+ * Holds common operations for input config handlers
  */
-public class DefaultInputConfigHandler implements InputConfigHandler {
-
-  private static final Logger logger = 
LogManager.getLogger(DefaultInputConfigHandler.class);
-
-  @Override
-  public void init(InputConfigHolder inputConfigHolder) throws Exception {
-  }
-
-  @Override
-  public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig inputConfig) {
-    loadInputs(serviceName, inputConfigHolder);
-    loadFilters(serviceName, inputConfigHolder);
-  }
-
-  @Override
-  public void assignInputsToOutputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig config) {
-    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
-      for (Output output : inputConfigHolder.getOutputManager().getOutputs()) {
-        if (input.isOutputRequired(output)) {
-          input.addOutput(output);
-        }
-      }
-    }
+public abstract class AbstractInputConfigHandler implements InputConfigHandler 
{
 
-    // In case of simulation copies of the output are added for each 
simulation instance, these must be added to the manager
-    for (Output output : InputSimulate.getSimulateOutputs()) {
-      output.setLogSearchConfig(inputConfigHolder.getConfig());
-      inputConfigHolder.getOutputManager().add(output);
-    }
-  }
-
-  private void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder) {
-    for (InputDescriptor inputDescriptor : 
inputConfigHolder.getInputConfigList()) {
-      if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
-        continue;
-      }
-
-      String source = inputDescriptor.getSource();
-      if (StringUtils.isEmpty(source)) {
-        logger.error("Input block doesn't have source element");
-        continue;
-      }
-      Input input = (Input) AliasUtil.getClassInstance(source, 
AliasUtil.AliasType.INPUT);
-      if (input == null) {
-        logger.error("Input object could not be found");
-        continue;
-      }
-      input.setType(source);
-      input.setLogType(inputDescriptor.getType());
-      input.loadConfig(inputDescriptor);
-
-      if (input.isEnabled()) {
-        input.setOutputManager(inputConfigHolder.getOutputManager());
-        input.setInputManager(inputConfigHolder.getInputManager());
-        inputConfigHolder.getInputManager().add(serviceName, input);
-        logger.info("New input object registered for service '{}': '{}'", 
serviceName, input.getLogType());
-        input.logConfigs();
-      } else {
-        logger.info("Input is disabled. So ignoring it. " + 
input.getShortDescription());
-      }
-    }
-  }
+  private static final Logger logger = 
LogManager.getLogger(AbstractInputConfigHandler.class);
 
-  private void loadFilters(String serviceName, InputConfigHolder 
inputConfigHolder) {
+  protected void loadFilters(String serviceName, InputConfigHolder 
inputConfigHolder) {
     sortFilters(inputConfigHolder);
 
     List<Input> toRemoveInputList = new ArrayList<>();
@@ -152,7 +88,7 @@ public class DefaultInputConfigHandler implements 
InputConfigHandler {
     }
   }
 
-  private void sortFilters(InputConfigHolder inputConfigHolder) {
+  protected void sortFilters(InputConfigHolder inputConfigHolder) {
     Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
       Integer o1Sort = o1.getSortOrder();
       Integer o2Sort = o2.getSortOrder();
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index deb3a91..ac10b2d 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -38,7 +38,7 @@ import java.util.List;
 /**
  * Holds input/filter/output operations in cloud Log Feeder mode.
  */
-public class CloudStorageInputConfigHandler implements InputConfigHandler {
+public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler 
{
 
   private static final Logger logger = 
LogManager.getLogger(CloudStorageInputConfigHandler.class);
 
@@ -49,6 +49,7 @@ public class CloudStorageInputConfigHandler implements 
InputConfigHandler {
 
   @Override
   public void loadInputs(String serviceName, InputConfigHolder 
inputConfigHolder, InputConfig inputConfig) {
+    final boolean useFilters = 
inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
     for (InputDescriptor inputDescriptor : 
inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
         logger.warn("Input descriptor is smpty. Skipping...");
@@ -72,9 +73,11 @@ public class CloudStorageInputConfigHandler implements 
InputConfigHandler {
       input.setType(source);
       input.setLogType(LogFeederConstants.CLOUD_PREFIX + 
inputDescriptor.getType());
       input.loadConfig(inputDescriptor);
-      FilterDummy filter = new FilterDummy();
-      filter.setOutputManager(inputConfigHolder.getOutputManager());
-      input.setFirstFilter(filter);
+      if (!useFilters) {
+        FilterDummy filter = new FilterDummy();
+        filter.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setFirstFilter(filter);
+      }
       input.setCloudInput(true);
 
       if (input.isEnabled()) {
@@ -87,6 +90,9 @@ public class CloudStorageInputConfigHandler implements 
InputConfigHandler {
         logger.info("Input is disabled. So ignoring it. " + 
input.getShortDescription());
       }
     }
+    if (useFilters) {
+      loadFilters(serviceName, inputConfigHolder);
+    }
   }
 
   @Override
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index 44da631..dd0fe3e 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -40,7 +40,7 @@ import java.util.List;
 /**
  * Holds input/filter/output operations in default Log Feeder mode.
  */
-public class DefaultInputConfigHandler implements InputConfigHandler {
+public class DefaultInputConfigHandler extends AbstractInputConfigHandler {
 
   private static final Logger logger = 
LogManager.getLogger(DefaultInputConfigHandler.class);
 
@@ -103,64 +103,4 @@ public class DefaultInputConfigHandler implements 
InputConfigHandler {
       }
     }
   }
-
-  private void loadFilters(String serviceName, InputConfigHolder 
inputConfigHolder) {
-    sortFilters(inputConfigHolder);
-
-    List<Input> toRemoveInputList = new ArrayList<>();
-    for (Input input : 
inputConfigHolder.getInputManager().getInputList(serviceName)) {
-      for (FilterDescriptor filterDescriptor : 
inputConfigHolder.getFilterConfigList()) {
-        if (filterDescriptor == null) {
-          logger.warn("Filter descriptor is smpty. Skipping...");
-          continue;
-        }
-        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
because it is disabled");
-          continue;
-        }
-        if (!input.isFilterRequired(filterDescriptor)) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " 
for input " + input.getShortDescription());
-          continue;
-        }
-
-        String value = filterDescriptor.getFilter();
-        if (StringUtils.isEmpty(value)) {
-          logger.error("Filter block doesn't have filter element");
-          continue;
-        }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, 
AliasUtil.AliasType.FILTER);
-        if (filter == null) {
-          logger.error("Filter object could not be found");
-          continue;
-        }
-        filter.loadConfig(filterDescriptor);
-        filter.setInput(input);
-
-        filter.setOutputManager(inputConfigHolder.getOutputManager());
-        input.addFilter(filter);
-        filter.logConfigs();
-      }
-
-      if (input.getFirstFilter() == null) {
-        toRemoveInputList.add(input);
-      }
-    }
-
-    for (Input toRemoveInput : toRemoveInputList) {
-      logger.warn("There are no filters, we will ignore this input. " + 
toRemoveInput.getShortDescription());
-      inputConfigHolder.getInputManager().removeInput(toRemoveInput);
-    }
-  }
-
-  private void sortFilters(InputConfigHolder inputConfigHolder) {
-    Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
-      Integer o1Sort = o1.getSortOrder();
-      Integer o2Sort = o2.getSortOrder();
-      if (o1Sort == null || o2Sort == null) {
-        return 0;
-      }
-
-      return o1Sort - o2Sort;
-    });
-  }
 }
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
new file mode 100644
index 0000000..bd9e3df
--- /dev/null
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import com.google.common.hash.Hashing;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for fill output with other fields
+ */
+public class OutputLineEnricher {
+
+  private static final Logger logger = 
LogManager.getLogger(OutputLineEnricher.class);
+
+  private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
+
+  public void enrichFields(final Map<String, Object> jsonObj, final 
InputMarker inputMarker, final MetricData messageTruncateMetric) {
+    Input input = inputMarker.getInput();
+    // Update the block with the context fields
+    for (Map.Entry<String, String> entry : 
input.getInputDescriptor().getAddFields().entrySet()) {
+      if (jsonObj.get(entry.getKey()) == null || 
entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) 
{
+        jsonObj.put(entry.getKey(), entry.getValue());
+      }
+    }
+    // TODO: Ideally most of the overrides should be configurable
+    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
+    if (input.isUseEventMD5() || input.isGenEventMD5()) {
+      String prefix = "";
+      Object logtimeObj = jsonObj.get("logtime");
+      if (logtimeObj != null) {
+        if (logtimeObj instanceof Date) {
+          prefix = "" + ((Date) logtimeObj).getTime();
+        } else {
+          prefix = logtimeObj.toString();
+        }
+      }
+      byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
+      long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
+      if (input.isGenEventMD5()) {
+        jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
+      }
+      if (input.isUseEventMD5()) {
+        jsonObj.put("id", prefix + Long.toString(eventMD5));
+      }
+    }
+    jsonObj.computeIfAbsent("event_count", k -> 1);
+    if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
+      jsonObj.put("group", input.getInputDescriptor().getGroup());
+    }
+    if (inputMarker.getAllProperties().containsKey("line_number") &&
+      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
+      jsonObj.put("logfile_line_number", 
inputMarker.getAllProperties().get("line_number"));
+    }
+    if (jsonObj.containsKey("log_message")) {
+      // TODO: Let's check size only for log_message for now
+      String logMessage = (String) jsonObj.get("log_message");
+      logMessage = truncateLongLogMessage(messageTruncateMetric, jsonObj, 
input, logMessage);
+      jsonObj.put("message_md5", "" + 
Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private String truncateLongLogMessage(MetricData messageTruncateMetric, 
Map<String, Object> jsonObj, Input input, String logMessage) {
+    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+      messageTruncateMetric.value++;
+      String logMessageKey = 
input.getOutputManager().getClass().getSimpleName() + "_MESSAGESIZE";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too 
big. size=" + logMessage.getBytes().length +
+        ", input=" + input.getShortDescription() + ". Truncating to " + 
MAX_OUTPUT_SIZE + ", first upto 200 characters=" +
+        StringUtils.abbreviate(logMessage, 200), null, logger, Level.WARN);
+      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
+      jsonObj.put("log_message", logMessage);
+      List<String> tagsList = (List<String>) jsonObj.get("tags");
+      if (tagsList == null) {
+        tagsList = new ArrayList<>();
+        jsonObj.put("tags", tagsList);
+      }
+      tagsList.add("error_message_truncated");
+    }
+    return logMessage;
+  }
+}
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index afe1c0a..b4c862d 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -59,7 +59,8 @@ public class OutputManagerImpl extends OutputManager {
   @Inject
   private LogFeederProps logFeederProps;
 
-  private OutputLineFilter outputLineFilter = new OutputLineFilter();
+  private final OutputLineEnricher outputLineEnricher = new 
OutputLineEnricher();
+  private final OutputLineFilter outputLineFilter = new OutputLineFilter();
 
   public List<Output> getOutputs() {
     return outputs;
@@ -80,57 +81,12 @@ public class OutputManagerImpl extends OutputManager {
 
   @SuppressWarnings("unchecked")
   public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    Input input = inputMarker.getInput();
-
-    // Update the block with the context fields
-    for (Map.Entry<String, String> entry : 
input.getInputDescriptor().getAddFields().entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null || 
entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) 
{
-        jsonObj.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // TODO: Ideally most of the overrides should be configurable
-
-    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
-    jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-
-    if (input.isUseEventMD5() || input.isGenEventMD5()) {
-      String prefix = "";
-      Object logtimeObj = jsonObj.get("logtime");
-      if (logtimeObj != null) {
-        if (logtimeObj instanceof Date) {
-          prefix = "" + ((Date) logtimeObj).getTime();
-        } else {
-          prefix = logtimeObj.toString();
-        }
-      }
-
-
-      byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
-      long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
-      if (input.isGenEventMD5()) {
-        jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
-      }
-      if (input.isUseEventMD5()) {
-        jsonObj.put("id", prefix + Long.toString(eventMD5));
-      }
-    }
-
     jsonObj.put("seq_num", docCounter++);
-    jsonObj.computeIfAbsent("event_count", k -> 1);
-    if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
-      jsonObj.put("group", input.getInputDescriptor().getGroup());
-    }
-    if (inputMarker.getAllProperties().containsKey("line_number") &&
-      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
-      jsonObj.put("logfile_line_number", 
inputMarker.getAllProperties().get("line_number"));
-    }
-    if (jsonObj.containsKey("log_message")) {
-      // TODO: Let's check size only for log_message for now
-      String logMessage = (String) jsonObj.get("log_message");
-      logMessage = truncateLongLogMessage(jsonObj, input, logMessage);
-      jsonObj.put("message_md5", "" + 
Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+    if (docCounter == Long.MIN_VALUE) {
+      docCounter = 1;
     }
+    outputLineEnricher.enrichFields(jsonObj, inputMarker, 
messageTruncateMetric);
+    Input input = inputMarker.getInput();
     List<String> defaultLogLevels = getDefaultLogLevels(input);
     if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels)
       && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) {
@@ -159,26 +115,6 @@ public class OutputManagerImpl extends OutputManager {
   }
 
   @SuppressWarnings("unchecked")
-  private String truncateLongLogMessage(Map<String, Object> jsonObj, Input 
input, String logMessage) {
-    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
-      messageTruncateMetric.value++;
-      String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too 
big. size=" + logMessage.getBytes().length +
-        ", input=" + input.getShortDescription() + ". Truncating to " + 
MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
-        StringUtils.abbreviate(logMessage, 100), null, logger, Level.WARN);
-      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
-      jsonObj.put("log_message", logMessage);
-      List<String> tagsList = (List<String>) jsonObj.get("tags");
-      if (tagsList == null) {
-        tagsList = new ArrayList<String>();
-        jsonObj.put("tags", tagsList);
-      }
-      tagsList.add("error_message_truncated");
-    }
-    return logMessage;
-  }
-
-  @SuppressWarnings("unchecked")
   public void write(String jsonBlock, InputMarker inputMarker) {
     List<String> defaultLogLevels = 
getDefaultLogLevels(inputMarker.getInput());
     if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker, 
defaultLogLevels)) {
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
index 8201051..0cfdbcc 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -48,8 +48,11 @@ public class CloudStorageLoggerFactory {
   private static final String ARCHIVED_FOLDER = "archived";
   private static final String DATE_PATTERN_SUFFIX_GZ = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz";
   private static final String DATE_PATTERN_SUFFIX = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log";
+  private static final String JSON_DATE_PATTERN_SUFFIX_GZ = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json.gz";
+  private static final String JSON_DATE_PATTERN_SUFFIX = 
"-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json";
 
   public static Logger createLogger(Input input, LoggerContext loggerContext, 
LogFeederProps logFeederProps) {
+    boolean useJsonFormat = logFeederProps.isCloudStorageUseFilters();
     String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, 
"");
     String uniqueThreadName = input.getThread().getName();
     Configuration config = loggerContext.getConfiguration();
@@ -59,8 +62,15 @@ public class CloudStorageLoggerFactory {
     String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, 
type).toFile().getAbsolutePath();
 
     boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
-    String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : 
DATE_PATTERN_SUFFIX;
-    String fileName = String.join(File.separator, activeLogDir, type + ".log");
+    final String archiveFilePattern;
+    if (useJsonFormat) {
+      archiveFilePattern = useGzip ? JSON_DATE_PATTERN_SUFFIX_GZ : 
JSON_DATE_PATTERN_SUFFIX;
+    } else {
+      archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : 
DATE_PATTERN_SUFFIX;
+    }
+
+    String logSuffix = useJsonFormat ? ".json" : ".log";
+    String fileName = String.join(File.separator, activeLogDir, type + 
logSuffix);
     String filePattern = String.join(File.separator, archiveLogDir, type + 
archiveFilePattern);
     PatternLayout layout = PatternLayout.newBuilder()
       .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build();
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
index 16b7e55..9be30a0 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -18,7 +18,10 @@
  */
 package org.apache.ambari.logfeeder.output.cloud;
 
+import org.apache.ambari.logfeeder.common.IdGeneratorHelper;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.OutputLineEnricher;
+import org.apache.ambari.logfeeder.output.OutputLineFilter;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -33,6 +36,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Handle output operations for sending cloud inputs to a cloud storage 
destination
@@ -47,10 +51,25 @@ public class CloudStorageOutputManager extends 
OutputManager {
   private CloudStorageOutput storageOutput = null;
 
   private List<Output> outputList = new ArrayList<>();
+  private final AtomicBoolean useFilters = new AtomicBoolean(false);
+
+  private final MetricData messageTruncateMetric = new MetricData(null, false);
+  private final OutputLineEnricher outputLineEnricher = new 
OutputLineEnricher();
+  private final OutputLineFilter outputLineFilter = new OutputLineFilter();
 
   @Override
   public void write(Map<String, Object> jsonObj, InputMarker marker) {
-    write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+    if (useFilters.get()) {
+      outputLineEnricher.enrichFields(jsonObj, marker, messageTruncateMetric);
+      if (!outputLineFilter.apply(jsonObj, marker.getInput())) {
+        if (jsonObj.get("id") == null) {
+          jsonObj.put("id", IdGeneratorHelper.generateUUID(jsonObj, 
storageOutput.getIdFields()));
+        }
+        write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+      }
+    } else {
+      write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+    }
   }
 
   @Override
@@ -82,6 +101,12 @@ public class CloudStorageOutputManager extends 
OutputManager {
     storageOutput = new CloudStorageOutput(logFeederProps);
     storageOutput.init(logFeederProps);
     add(storageOutput);
+    useFilters.set(logFeederProps.isCloudStorageUseFilters());
+    if (useFilters.get()) {
+      logger.info("Using filters are enabled for cloud log outputs");
+    } else {
+      logger.info("Using filters are disabled for cloud log outputs");
+    }
   }
 
   @Override
diff --git 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index b76f441..af9326a 100644
--- 
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ 
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -76,7 +76,7 @@ public class CloudStorageUploader extends Thread {
     try {
       final String archiveLogDir = String.join(File.separator, 
logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, 
"archived");
       if (new File(archiveLogDir).exists()) {
-        String[] extensions = {"log", "gz"};
+        String[] extensions = {"log", "json", "gz"};
         Collection<File> filesToUpload = FileUtils.listFiles(new 
File(archiveLogDir), extensions, true);
         if (filesToUpload.isEmpty()) {
           logger.debug("Not found any files to upload.");
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties 
b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index c7ea335..45c05f3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -47,6 +47,7 @@ logfeeder.cloud.storage.uploader.interval.seconds=1
 logfeeder.cloud.storage.upload.on.shutdown=true
 logfeeder.cloud.storage.base.path=/apps/logfeeder
 logfeeder.cloud.storage.use.hdfs.client=true
+logfeeder.cloud.storage.use.filters=false
 
 logfeeder.cloud.storage.bucket=logfeeder
 logfeeder.cloud.storage.bucket.bootstrap=true

Reply via email to