This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c4209e  AMBARI-22440. Log Feeder: wildcard support & checkpoint 
cleanup on trunk
3c4209e is described below

commit 3c4209ed6f7dc073c0e06c6587ca506a97bac4cb
Author: Oliver Szabo <[email protected]>
AuthorDate: Mon Apr 2 20:34:39 2018 +0200

    AMBARI-22440. Log Feeder: wildcard support & checkpoint cleanup on trunk
---
 .../model/inputconfig/FilterGrokDescriptor.java    |   4 +
 .../api/model/inputconfig/InputDescriptor.java     |   2 +
 .../api/model/inputconfig/InputFileDescriptor.java |   7 +
 .../inputconfig/impl/FilterGrokDescriptorImpl.java |  38 ++
 .../inputconfig/impl/InputDescriptorImpl.java      |  21 +
 .../inputconfig/impl/InputFileDescriptorImpl.java  |  83 +++
 .../ambari/logfeeder/plugin/common/ConfigItem.java |   3 +-
 .../plugin/common/LogFeederProperties.java         |   3 +-
 .../ambari/logfeeder/plugin/common/MetricData.java |   4 +-
 .../ambari/logfeeder/plugin/filter/Filter.java     |   4 +
 .../ambari/logfeeder/plugin/input/Input.java       |  15 +-
 .../logfeeder/plugin/input/cache/LRUCache.java     |   3 +-
 .../ambari-logsearch-logfeeder/pom.xml             |   5 +
 .../apache/ambari/logfeeder/filter/FilterGrok.java |  25 +-
 .../apache/ambari/logfeeder/input/InputFile.java   | 220 ++++++-
 .../ambari/logfeeder/input/InputManagerImpl.java   |  12 +
 .../logfeeder/input/file/FileCheckInHelper.java    |   4 +-
 .../input/monitor/AbstractLogFileMonitor.java      |  64 ++
 .../input/monitor/CheckpointCleanupMonitor.java    |  48 ++
 .../input/monitor/LogFileDetachMonitor.java        |  79 +++
 .../input/monitor/LogFilePathUpdateMonitor.java    |  74 +++
 .../ambari/logfeeder/output/OutputManagerImpl.java |  21 +-
 .../org/apache/ambari/logfeeder/util/FileUtil.java | 137 ++++-
 .../ambari/logfeeder/util/LogFeederUtil.java       |  24 +
 .../logsearch/model/common/LSServerFilterGrok.java |  24 +
 .../logsearch/model/common/LSServerInput.java      |   8 +
 .../logsearch/model/common/LSServerInputFile.java  |  52 ++
 .../shipper-conf/input.config-ambari.json          | 641 +++++++--------------
 .../logfeeder/shipper-conf/input.config-storm.json |  75 +++
 .../streamline-1-TestAgg-2-3/6700/worker.log       |   5 +
 .../streamline-1-TestAgg-2-3/6701/worker.log       |   5 +
 31 files changed, 1209 insertions(+), 501 deletions(-)

diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
index 039e1ff..9fc8eb4 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
@@ -27,4 +27,8 @@ public interface FilterGrokDescriptor extends 
FilterDescriptor {
   String getMessagePattern();
 
   void setMultilinePattern(String multilinePattern);
+
+  boolean isSkipOnError();
+
+  boolean isDeepExtract();
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
index d3a43c1..71f3cd1 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
@@ -51,4 +51,6 @@ public interface InputDescriptor {
   Boolean isEnabled();
 
   String getGroup();
+
+  Boolean isInitDefaultFields();
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
index 0070ad9..b58db6a 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
@@ -20,4 +20,11 @@
 package org.apache.ambari.logsearch.config.api.model.inputconfig;
 
 public interface InputFileDescriptor extends InputFileBaseDescriptor {
+  Integer getDetachIntervalMin();
+
+  Integer getDetachTimeMin();
+
+  Integer getPathUpdateIntervalMin();
+
+  Integer getMaxAgeMin();
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
index e140df0..9823163 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
@@ -61,6 +61,26 @@ public class FilterGrokDescriptorImpl extends 
FilterDescriptorImpl implements Fi
   @SerializedName("message_pattern")
   private String messagePattern;
 
+  @ShipperConfigElementDescription(
+    path = "/filter/[]/skip_on_error",
+    type = "boolean",
+    description = "Skip filter if an error occurred during applying the grok 
filter.",
+    examples = {"true"}
+  )
+  @Expose
+  @SerializedName("skip_on_error")
+  private boolean skipOnError;
+
+  @ShipperConfigElementDescription(
+    path = "/filter/[]/deep_extract",
+    type = "boolean",
+    description = "",
+    examples = {""}
+  )
+  @Expose
+  @SerializedName("deep_extract")
+  private boolean deepExtract;
+
   @Override
   public String getLog4jFormat() {
     return log4jFormat;
@@ -81,6 +101,24 @@ public class FilterGrokDescriptorImpl extends 
FilterDescriptorImpl implements Fi
   }
 
   @Override
+  public boolean isSkipOnError() {
+    return this.skipOnError;
+  }
+
+  public void setSkipOnError(boolean skipOnError) {
+    this.skipOnError = skipOnError;
+  }
+
+  @Override
+  public boolean isDeepExtract() {
+    return deepExtract;
+  }
+
+  public void setDeepExtract(boolean deepExtract) {
+    this.deepExtract = deepExtract;
+  }
+
+  @Override
   public String getMessagePattern() {
     return messagePattern;
   }
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
index 0994b9f..40886be 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
@@ -188,6 +188,18 @@ public abstract class InputDescriptorImpl implements 
InputDescriptor {
   @SerializedName("is_enabled")
   private Boolean isEnabled;
 
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/init_default_fields",
+    type = "boolean",
+    description = "Init default fields (ip, path etc.) before applying the 
filter.",
+    examples = {"true", "false"},
+    defaultValue = "false"
+  )
+  @Expose
+  @SerializedName("init_default_fields")
+  private Boolean initDefaultFields;
+
   public String getType() {
     return type;
   }
@@ -308,4 +320,13 @@ public abstract class InputDescriptorImpl implements 
InputDescriptor {
   public void setGroup(String group) {
     this.group = group;
   }
+
+  @Override
+  public Boolean isInitDefaultFields() {
+    return this.initDefaultFields;
+  }
+
+  public void setInitDefaultFields(Boolean initDefaultFields) {
+    this.initDefaultFields = initDefaultFields;
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
index 3bfd161..99b42fe 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -19,7 +19,90 @@
 
 package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
 
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.logsearch.config.api.ShipperConfigElementDescription;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 
 public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl 
implements InputFileDescriptor {
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/detach_interval_min",
+    type = "integer",
+    description = "The period in minutes for checking which files are too old 
(default: 300)",
+    examples = {"60"},
+    defaultValue = "1800"
+  )
+  @Expose
+  @SerializedName("detach_interval_min")
+  private Integer detachIntervalMin;
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/detach_time_min",
+    type = "integer",
+    description = "The period in minutes when the application flags a file is 
too old (default: 2000)",
+    examples = {"60"},
+    defaultValue = "2000"
+  )
+  @Expose
+  @SerializedName("detach_time_min")
+  private Integer detachTimeMin;
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/path_update_interval_min",
+    type = "integer",
+    description = "The period in minutes for checking new files (default: 5, 
based on detach values, its possible that a new input wont be monitored)",
+    examples = {"5"},
+    defaultValue = "5"
+  )
+  @Expose
+  @SerializedName("path_update_interval_min")
+  private Integer pathUpdateIntervalMin;
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/max_age_min",
+    type = "integer",
+    description = "If the file has not modified for long (this time value in 
minutes), then the checkpoint file can be deleted.",
+    examples = {"2000"},
+    defaultValue = "0"
+  )
+  @Expose
+  @SerializedName("max_age_min")
+  private Integer maxAgeMin;
+
+  @Override
+  public Integer getDetachIntervalMin() {
+    return this.detachIntervalMin;
+  }
+
+  @Override
+  public Integer getDetachTimeMin() {
+    return this.detachTimeMin;
+  }
+
+  @Override
+  public Integer getPathUpdateIntervalMin() {
+    return this.pathUpdateIntervalMin;
+  }
+
+  @Override
+  public Integer getMaxAgeMin() {
+    return this.maxAgeMin;
+  }
+
+  public void setDetachIntervalMin(Integer detachIntervalMin) {
+    this.detachIntervalMin = detachIntervalMin;
+  }
+
+  public void setDetachTimeMin(Integer detachTimeMin) {
+    this.detachTimeMin = detachTimeMin;
+  }
+
+  public void setPathUpdateIntervalMin(Integer pathUpdateIntervalMin) {
+    this.pathUpdateIntervalMin = pathUpdateIntervalMin;
+  }
+
+  public void setMaxAgeMin(Integer maxAgeMin) {
+    this.maxAgeMin = maxAgeMin;
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
index f9d4a7a..1cbbfd5 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
@@ -24,12 +24,13 @@ import com.google.gson.reflect.TypeToken;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> 
implements Cloneable {
+public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> 
implements Cloneable, Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConfigItem.class);
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
index a1e5c12..7fac01a 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
@@ -18,12 +18,13 @@
  */
 package org.apache.ambari.logfeeder.plugin.common;
 
+import java.io.Serializable;
 import java.util.Properties;
 
 /**
  * Static application level configuration interface for Log Feeder
  */
-public interface LogFeederProperties {
+public interface LogFeederProperties extends Serializable {
 
   /**
    * Get all key-value pairs from static application level Log Feeder 
configuration
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
index 933f131..54cdb7e 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
@@ -18,7 +18,9 @@
  */
 package org.apache.ambari.logfeeder.plugin.common;
 
-public class MetricData {
+import java.io.Serializable;
+
+public class MetricData implements Serializable {
   public final String metricsName;
   public final boolean isPointInTime;
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
index e2180d7..f098245 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
@@ -184,4 +184,8 @@ public abstract class Filter<PROP_TYPE extends 
LogFeederProperties> extends Conf
     // no metrics yet
     return null;
   }
+
+  public Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 368f7d5..a586510 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -56,6 +56,7 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
   private Thread thread;
   private LRUCache cache;
   private String cacheKeyField;
+  private boolean initDefaultFields;
   protected MetricData readBytesMetric = new 
MetricData(getReadBytesMetricName(), false);
 
   public void loadConfigs(InputDescriptor inputDescriptor, PROP_TYPE 
logFeederProperties,
@@ -76,8 +77,6 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
 
   public abstract boolean monitor();
 
-  public abstract List<? extends Input> getChildInputs();
-
   public abstract INPUT_MARKER getInputMarker();
 
   public abstract boolean isReady();
@@ -337,4 +336,16 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
   public String toString() {
     return getShortDescription();
   }
+
+  public void setFirstFilter(Filter<PROP_TYPE> firstFilter) {
+    this.firstFilter = firstFilter;
+  }
+
+  public boolean isInitDefaultFields() {
+    return initDefaultFields;
+  }
+
+  public void setInitDefaultFields(boolean initDefaultFields) {
+    this.initDefaultFields = initDefaultFields;
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
index e0509fe..9ee8743 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
@@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.plugin.input.cache;
 
 import com.google.common.collect.EvictingQueue;
 
+import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -28,7 +29,7 @@ import java.util.Map;
  * It won't put already existing entries into the cache map if de-duplication 
interval not higher then a specific value
  * or if the new value is the most recently used one (in case of 
lastDedupEnabled is true)
  */
-public class LRUCache {
+public class LRUCache implements Serializable {
   private final LinkedHashMap<String, Long> keyValueMap;
   private final String fileName;
   private final long dedupInterval;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml 
b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 884f49f..688c944 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -215,6 +215,11 @@
       <artifactId>netty-all</artifactId>
       <version>4.0.37.Final</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ant</groupId>
+      <artifactId>ant</artifactId>
+      <version>1.7.1</version>
+    </dependency>
     <!-- Exclude jars globally-->
     <dependency>
       <groupId>commons-beanutils</groupId>
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 21b1ea4..2074f93 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -71,6 +71,8 @@ public class FilterGrok extends Filter<LogFeederProps> {
 
   private MetricData grokErrorMetric = new MetricData("filter.error.grok", 
false);
 
+  private boolean skipOnError = false;
+
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
     super.init(logFeederProps);
@@ -80,6 +82,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
       multilinePattern = 
escapePattern(((FilterGrokDescriptor)getFilterDescriptor()).getMultilinePattern());
       sourceField = getFilterDescriptor().getSourceField();
       removeSourceField = 
BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(),
 removeSourceField);
+      skipOnError = ((FilterGrokDescriptor) 
getFilterDescriptor()).isSkipOnError();
 
       LOG.info("init() done. grokPattern=" + messagePattern + ", 
multilinePattern=" + multilinePattern + ", " +
       getShortDescription());
@@ -92,6 +95,11 @@ public class FilterGrok extends Filter<LogFeederProps> {
       grokMessage = new Grok();
       loadPatterns(grokMessage);
       grokMessage.compile(messagePattern);
+      if (((FilterGrokDescriptor)getFilterDescriptor()).isDeepExtract()) {
+        extractNamedParams(grokMessage.getNamedRegexCollection());
+      } else {
+        extractNamedParams(messagePattern, namedParamList);
+      }
       if (!StringUtils.isEmpty(multilinePattern)) {
         extractNamedParams(multilinePattern, multiLineamedParamList);
 
@@ -134,6 +142,16 @@ public class FilterGrok extends Filter<LogFeederProps> {
     }
   }
 
+  private void extractNamedParams(Map<String, String> namedRegexCollection) {
+    if (namedRegexCollection != null) {
+      for (String paramValue : namedRegexCollection.values()) {
+        if (paramValue.toLowerCase().equals(paramValue)) {
+          namedParamList.add(paramValue);
+        }
+      }
+    }
+  }
+
   private boolean loadPatterns(Grok grok) {
     InputStreamReader grokPatternsReader = null;
     LOG.info("Loading pattern file " + GROK_PATTERN_FILE);
@@ -166,10 +184,11 @@ public class FilterGrok extends Filter<LogFeederProps> {
 
     if (grokMultiline != null) {
       String jsonStr = grokMultiline.capture(inputStr);
-      if (!"{}".equals(jsonStr)) {
+      if (!"{}".equals(jsonStr) || skipOnError) {
         if (strBuff != null) {
           Map<String, Object> jsonObj = Collections.synchronizedMap(new 
HashMap<String, Object>());
           try {
+            LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, 
false);
             applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
           } finally {
             strBuff = null;
@@ -189,6 +208,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
     } else {
       savedInputMarker = inputMarker;
       Map<String, Object> jsonObj = Collections.synchronizedMap(new 
HashMap<String, Object>());
+      LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, false);
       applyMessage(inputStr, jsonObj, null);
     }
   }
@@ -197,6 +217,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
   public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) 
throws Exception {
     if (sourceField != null) {
       savedInputMarker = inputMarker;
+      LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, false);
       applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
       if (removeSourceField) {
         jsonObj.remove(sourceField);
@@ -208,7 +229,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
     String jsonStr = grokMessage.capture(inputStr);
 
     boolean parseError = false;
-    if ("{}".equals(jsonStr)) {
+    if ("{}".equals(jsonStr) && !skipOnError) {
       parseError = true;
       logParseError(inputStr);
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 8b5310f..726a237 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -20,15 +20,18 @@ package org.apache.ambari.logfeeder.input;
 
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
+import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
 import org.apache.ambari.logfeeder.input.file.FileCheckInHelper;
 import org.apache.ambari.logfeeder.input.file.ProcessFileHelper;
 import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang3.ArrayUtils;
@@ -39,12 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileFilter;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class InputFile extends Input<LogFeederProps, InputFileMarker> {
 
@@ -55,6 +53,10 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
   private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
   private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
 
+  private static final int DEFAULT_DETACH_INTERVAL_MIN = 300;
+  private static final int DEFAULT_DETACH_TIME_MIN = 2000;
+  private static final int DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN = 5;
+
   private boolean isReady;
 
   private boolean tail;
@@ -66,6 +68,10 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
   private String base64FileKey;
   private String checkPointExtension;
   private int checkPointIntervalMS;
+  private int detachIntervalMin;
+  private int detachTimeMin;
+  private int pathUpdateIntervalMin;
+  private Integer maxAgeMin;
 
   private Map<String, File> checkPointFiles = new HashMap<>();
   private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
@@ -73,12 +79,21 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
   private Map<String, InputFileMarker> lastCheckPointInputMarkers = new 
HashMap<>();
 
   private Thread thread;
+  private Thread logFileDetacherThread;
+  private Thread logFilePathUpdaterThread;
+  private ThreadGroup threadGroup;
+
+  private boolean multiFolder = false;
+  private Map<String, List<File>> folderMap;
+  private Map<String, InputFile> inputChildMap = new HashMap<>();
 
   @Override
   public boolean isReady() {
     if (!isReady) {
       // Let's try to check whether the file is available
-      logFiles = getActualFiles(logPath);
+      logFiles = getActualInputLogFiles();
+      Map<String, List<File>> foldersMap = 
FileUtil.getFoldersForFiles(logFiles);
+      setFolderMap(foldersMap);
       if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
         if (tail && logFiles.length > 1) {
           LOG.warn("Found multiple files (" + logFiles.length + ") for the 
file filter " + filePath +
@@ -132,26 +147,32 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
     return "input.files.read_bytes";
   }
 
-  private File[] getActualFiles(String searchPath) {
-    File searchFile = new File(searchPath);
-    if (!searchFile.getParentFile().exists()) {
-      return new File[0];
-    } else if (searchFile.isFile()) {
-      return new File[]{searchFile};
-    } else {
-      FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
-      File[] logFiles = searchFile.getParentFile().listFiles(fileFilter);
-      Arrays.sort(logFiles, Comparator.comparing(File::getName));
-      return logFiles;
-    }
-  }
-
   @Override
   public boolean monitor() {
     if (isReady()) {
-      LOG.info("Starting thread. " + getShortDescription());
-      thread = new Thread(this, getNameForThread());
-      thread.start();
+      if (multiFolder) {
+        try {
+          threadGroup = new ThreadGroup(getNameForThread());
+          if (getFolderMap() != null) {
+            for (Map.Entry<String, List<File>> folderFileEntry : 
getFolderMap().entrySet()) {
+              startNewChildInputFileThread(folderFileEntry);
+            }
+            logFilePathUpdaterThread = new Thread(new 
LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, 
detachTimeMin), "logfile_path_updater=" + filePath);
+            logFilePathUpdaterThread.setDaemon(true);
+            logFileDetacherThread = new Thread(new 
LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), 
"logfile_detacher=" + filePath);
+            logFileDetacherThread.setDaemon(true);
+
+            logFilePathUpdaterThread.start();
+            logFileDetacherThread.start();
+          }
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        LOG.info("Starting thread. " + getShortDescription());
+        thread = new Thread(this, getNameForThread());
+        thread.start();
+      }
       return true;
     } else {
       return false;
@@ -159,11 +180,6 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
   }
 
   @Override
-  public List<InputFile> getChildInputs() {
-    return null;
-  }
-
-  @Override
   public InputFileMarker getInputMarker() {
     return null;
   }
@@ -179,15 +195,29 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
     setClosed(true);
     logPath = getInputDescriptor().getPath();
     checkPointIntervalMS = (int) 
ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(),
 DEFAULT_CHECKPOINT_INTERVAL_MS);
-
+    detachIntervalMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(),
 DEFAULT_DETACH_INTERVAL_MIN * 60);
+    detachTimeMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(),
 DEFAULT_DETACH_TIME_MIN * 60);
+    pathUpdateIntervalMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getPathUpdateIntervalMin(),
 DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60);
+    maxAgeMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(),
 0);
+    boolean initDefaultFields = 
BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(), 
false);
+    setInitDefaultFields(initDefaultFields);
     if (StringUtils.isEmpty(logPath)) {
       LOG.error("path is empty for file input. " + getShortDescription());
       return;
     }
 
     setFilePath(logPath);
+    // Check there can have pattern in folder
+    if (getFilePath() != null && getFilePath().contains("/")) {
+      int lastIndexOfSlash = getFilePath().lastIndexOf("/");
+      String folderBeforeLogName = getFilePath().substring(0, 
lastIndexOfSlash);
+      if (folderBeforeLogName.contains("*")) {
+        LOG.info("Found regex in folder path ('" + getFilePath() + "'), will 
check against multiple folders.");
+        setMultiFolder(true);
+      }
+    }
     boolean isFileReady = isReady();
-    LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + 
isReady());
+    LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + 
isFileReady);
 
     LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig();
     initCache(
@@ -265,6 +295,73 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
     }
   }
 
+  public void startNewChildInputFileThread(Map.Entry<String, List<File>> 
folderFileEntry) throws CloneNotSupportedException {
+    LOG.info("Start child input thread - " + folderFileEntry.getKey());
+    InputFile clonedObject = (InputFile) this.clone();
+    String folderPath = folderFileEntry.getKey();
+    String filePath = new File(getFilePath()).getName();
+    String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath);
+    if (clonedObject.getMaxAgeMin() != 0 && FileUtil.isFileTooOld(new 
File(fullPathWithWildCard), clonedObject.getMaxAgeMin().longValue())) {
+      LOG.info(String.format("File ('%s') is too old (max age min: %d), 
monitor thread not starting...", getFilePath(), clonedObject.getMaxAgeMin()));
+    } else {
+      clonedObject.setMultiFolder(false);
+      clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); 
// TODO: works only with tail
+      clonedObject.logPath = fullPathWithWildCard;
+      clonedObject.setLogFileDetacherThread(null);
+      clonedObject.setLogFilePathUpdaterThread(null);
+      clonedObject.setInputChildMap(new HashMap<>());
+      copyFilters(clonedObject, getFirstFilter());
+      Thread thread = new Thread(threadGroup, clonedObject, "file=" + 
fullPathWithWildCard);
+      clonedObject.setThread(thread);
+      inputChildMap.put(fullPathWithWildCard, clonedObject);
+      thread.start();
+    }
+  }
+
+  private void copyFilters(InputFile clonedInput, Filter firstFilter) {
+    if (firstFilter != null) {
+      try {
+        LOG.info("Cloning filters for input=" + clonedInput.logPath);
+        Filter newFilter = (Filter) firstFilter.clone();
+        newFilter.setInput(clonedInput);
+        clonedInput.setFirstFilter(newFilter);
+        Filter actFilter = firstFilter;
+        Filter actClonedFilter = newFilter;
+        while (actFilter != null) {
+          if (actFilter.getNextFilter() != null) {
+            actFilter = actFilter.getNextFilter();
+            Filter newClonedFilter = (Filter) actFilter.clone();
+            newClonedFilter.setInput(clonedInput);
+            actClonedFilter.setNextFilter(newClonedFilter);
+            actClonedFilter = newClonedFilter;
+          } else {
+            actClonedFilter.setNextFilter(null);
+            actFilter = null;
+          }
+        }
+        LOG.info("Cloning filters has finished for input=" + 
clonedInput.logPath);
+      } catch (Exception e) {
+        LOG.error("Could not clone filters for input=" + clonedInput.logPath);
+      }
+    }
+  }
+
+  public void stopChildInputFileThread(String folderPathKey) {
+    LOG.info("Stop child input thread - " + folderPathKey);
+    String filePath = new File(getFilePath()).getName();
+    String fullPathWithWildCard = String.format("%s/%s", folderPathKey, 
filePath);
+    if (inputChildMap.containsKey(fullPathWithWildCard)) {
+      InputFile inputFile = inputChildMap.get(fullPathWithWildCard);
+      inputFile.setClosed(true);
+      if (inputFile.getThread() != null && inputFile.getThread().isAlive()) {
+        inputFile.getThread().interrupt();
+      }
+      inputChildMap.remove(fullPathWithWildCard);
+    } else {
+      LOG.warn(fullPathWithWildCard + " not found as an input child.");
+    }
+  }
+
   @Override
   public boolean isEnabled() {
     return BooleanUtils.isNotFalse(getInputDescriptor().isEnabled());
@@ -291,6 +388,10 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
     setClosed(true);
   }
 
+  public File[] getActualInputLogFiles() {
+    return FileUtil.getInputFilesByPattern(logPath);
+  }
+
   public String getFilePath() {
     return filePath;
   }
@@ -354,4 +455,59 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
   public Map<String, InputFileMarker> getLastCheckPointInputMarkers() {
     return lastCheckPointInputMarkers;
   }
+
+  public boolean isMultiFolder() {
+    return multiFolder;
+  }
+
+  public void setMultiFolder(boolean multiFolder) {
+    this.multiFolder = multiFolder;
+  }
+
+  public Map<String, List<File>> getFolderMap() {
+    return folderMap;
+  }
+
+  public void setFolderMap(Map<String, List<File>> folderMap) {
+    this.folderMap = folderMap;
+  }
+
+  public Map<String, InputFile> getInputChildMap() {
+    return inputChildMap;
+  }
+
+  public void setInputChildMap(Map<String, InputFile> inputChildMap) {
+    this.inputChildMap = inputChildMap;
+  }
+
+  @Override
+  public Thread getThread() {
+    return thread;
+  }
+
+  @Override
+  public void setThread(Thread thread) {
+    this.thread = thread;
+  }
+
+  public Thread getLogFileDetacherThread() {
+    return logFileDetacherThread;
+  }
+
+  public void setLogFileDetacherThread(Thread logFileDetacherThread) {
+    this.logFileDetacherThread = logFileDetacherThread;
+  }
+
+  public Thread getLogFilePathUpdaterThread() {
+    return logFilePathUpdaterThread;
+  }
+
+  public void setLogFilePathUpdaterThread(Thread logFilePathUpdaterThread) {
+    this.logFilePathUpdaterThread = logFilePathUpdaterThread;
+  }
+
+  public Integer getMaxAgeMin() {
+    return maxAgeMin;
+  }
+
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index 70aa681..40475c6 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.input;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.manager.InputManager;
@@ -152,6 +153,10 @@ public class InputManagerImpl extends InputManager {
 
     if (isCheckPointFolderValid) {
       LOG.info("Using folder " + checkPointFolderFile + " for storing 
checkpoints");
+      // check checkpoint cleanup every 2000 min
+      Thread checkpointCleanupThread = new Thread(new 
CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
+      checkpointCleanupThread.setDaemon(true);
+      checkpointCleanupThread.start();
     } else {
       throw new IllegalStateException("Could not determine the checkpoint 
folder.");
     }
@@ -304,6 +309,10 @@ public class InputManagerImpl extends InputManager {
 
         String logFilePath = (String) jsonCheckPoint.get("file_path");
         String logFileKey = (String) jsonCheckPoint.get("file_key");
+        Integer maxAgeMin = null;
+        if (jsonCheckPoint.containsKey("max_age_min")) {
+          maxAgeMin = 
Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
+        }
         if (logFilePath != null && logFileKey != null) {
           boolean deleteCheckPointFile = false;
           File logFile = new File(logFilePath);
@@ -314,6 +323,9 @@ public class InputManagerImpl extends InputManager {
               LOG.info("CheckPoint clean: File key has changed. old=" + 
logFileKey + ", new=" + fileBase64 + ", filePath=" +
                 logFilePath + ", checkPointFile=" + 
checkPointFile.getAbsolutePath());
               deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), 
logFileKey);
+            } else if (maxAgeMin != null && maxAgeMin != 0 && 
FileUtil.isFileTooOld(logFile, maxAgeMin)) {
+              deleteCheckPointFile = true;
+              LOG.info("Checkpoint clean: File reached max age minutes (" + 
maxAgeMin + "):" + logFilePath);
             }
           } else {
             LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + 
logFilePath + ", checkPointFile=" +
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
index 9c607cf..7b8f0cd 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -58,6 +58,9 @@ public class FileCheckInHelper {
       }
       inputFile.getLastCheckPointTimeMSs().put(inputMarker.getBase64FileKey(), 
currMS);
 
+      if (inputFile.getMaxAgeMin() != 0) {
+        jsonCheckPoint.put("max_age_min", inputFile.getMaxAgeMin().toString());
+      }
       jsonCheckPoint.put("line_number", "" + new 
Integer(inputMarker.getLineNumber()));
       jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
       jsonCheckPoint.put("last_write_time_date", new Date());
@@ -89,5 +92,4 @@ public class FileCheckInHelper {
   }
 
 
-
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
new file mode 100644
index 0000000..e0acde1
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractLogFileMonitor implements Runnable {
+
+  private Logger LOG = LoggerFactory.getLogger(AbstractLogFileMonitor.class);
+
+  private final InputFile inputFile;
+  private final int waitInterval;
+  private final int detachTime;
+
+  AbstractLogFileMonitor(InputFile inputFile, int waitInterval, int 
detachTime) {
+    this.inputFile = inputFile;
+    this.waitInterval = waitInterval;
+    this.detachTime = detachTime;
+  }
+
+  public InputFile getInputFile() {
+    return inputFile;
+  }
+
+  public int getDetachTime() {
+    return detachTime;
+  }
+
+  @Override
+  public void run() {
+    LOG.info(getStartLog());
+
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        Thread.sleep(1000 * waitInterval);
+        monitorAndUpdate();
+      } catch (Exception e) {
+        LOG.error("Monitor thread interrupted.", e);
+      }
+    }
+  }
+
+  protected abstract String getStartLog();
+
+  protected abstract void monitorAndUpdate() throws Exception;
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
new file mode 100644
index 0000000..65a314e
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CheckpointCleanupMonitor implements Runnable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCleanupMonitor.class);
+
+  private long waitIntervalMin;
+  private InputManager inputManager;
+
+  public CheckpointCleanupMonitor(InputManager inputManager, long 
waitIntervalMin) {
+    this.waitIntervalMin = waitIntervalMin;
+    this.inputManager = inputManager;
+  }
+
+  @Override
+  public void run() {
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        Thread.sleep(1000 * 60 * waitIntervalMin);
+        inputManager.cleanCheckPointFiles();
+      } catch (Exception e) {
+        LOG.error("Cleanup checkpoint files thread interrupted.", e);
+      }
+    }
+  }
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
new file mode 100644
index 0000000..a40e118
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Detach log files in case of folders do not exist or monitored files are too 
old
+ */
+public class LogFileDetachMonitor extends AbstractLogFileMonitor {
+
+  private Logger LOG = LoggerFactory.getLogger(LogFileDetachMonitor.class);
+
+  public LogFileDetachMonitor(InputFile inputFile, int interval, int 
detachTime) {
+    super(inputFile, interval, detachTime);
+  }
+
+  @Override
+  public String getStartLog() {
+    return "Start file detach monitor thread for " + 
getInputFile().getFilePath();
+  }
+
+  @Override
+  protected void monitorAndUpdate() throws Exception {
+    File[] logFiles = getInputFile().getActualInputLogFiles();
+    Map<String, List<File>> actualFolderMap = 
FileUtil.getFoldersForFiles(logFiles);
+
+    // create map copies
+    Map<String, InputFile> copiedInputFileMap = new 
HashMap<>(getInputFile().getInputChildMap());
+    Map<String, List<File>> copiedFolderMap = new 
HashMap<>(getInputFile().getFolderMap());
+    // detach old entries
+    for (Map.Entry<String, List<File>> entry : copiedFolderMap.entrySet()) {
+      if (new File(entry.getKey()).exists()) {
+        for (Map.Entry<String, InputFile> inputFileEntry : 
copiedInputFileMap.entrySet()) {
+          if (inputFileEntry.getKey().startsWith(entry.getKey())) {
+            File monitoredFile = entry.getValue().get(0);
+            boolean isFileTooOld = FileUtil.isFileTooOld(monitoredFile, 
getDetachTime());
+            if (isFileTooOld) {
+              LOG.info("File ('{}') in folder ('{}') is too old (reached {} 
minutes), detach input thread.", entry.getKey(), getDetachTime());
+              getInputFile().stopChildInputFileThread(entry.getKey());
+            }
+          }
+        }
+      } else {
+        LOG.info("Folder not exists. ({}) Stop thread.", entry.getKey());
+        for (Map.Entry<String, InputFile> inputFileEntry : 
copiedInputFileMap.entrySet()) {
+          if (inputFileEntry.getKey().startsWith(entry.getKey())) {
+            getInputFile().stopChildInputFileThread(entry.getKey());
+            getInputFile().setFolderMap(actualFolderMap);
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
new file mode 100644
index 0000000..bfcab5d
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Update log file paths periodically, useful if the log file name pattern 
format is like: mylog-2017-10-09.log (so the tail one can change)
+ */
+public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor {
+
+  private Logger LOG = LoggerFactory.getLogger(LogFilePathUpdateMonitor.class);
+
+  public LogFilePathUpdateMonitor(InputFile inputFile, int interval, int 
detachTime) {
+    super(inputFile, interval, detachTime);
+  }
+
+  @Override
+  public String getStartLog() {
+    return "Start file path update monitor thread for " + 
getInputFile().getFilePath();
+  }
+
+  @Override
+  protected void monitorAndUpdate() throws Exception {
+    File[] logFiles = getInputFile().getActualInputLogFiles();
+    Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
+    Map<String, List<File>> originalFoldersMap = getInputFile().getFolderMap();
+    for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) {
+      if (originalFoldersMap.keySet().contains(entry.getKey())) {
+        List<File> originalLogFiles = originalFoldersMap.get(entry.getKey());
+        if (!entry.getValue().isEmpty()) { // check tail only for now
+          File lastFile = entry.getValue().get(0);
+          if 
(!originalLogFiles.get(0).getAbsolutePath().equals(lastFile.getAbsolutePath())) 
{
+            LOG.info("New file found (old: '{}', new: {}), reload thread for 
{}",
+              lastFile.getAbsolutePath(), 
originalLogFiles.get(0).getAbsolutePath(), entry.getKey());
+            getInputFile().stopChildInputFileThread(entry.getKey());
+            getInputFile().startNewChildInputFileThread(entry);
+          }
+        }
+      } else {
+        LOG.info("New log file folder found: {}, start a new thread if tail 
file is not too old.", entry.getKey());
+        File monitoredFile = entry.getValue().get(0);
+        if (FileUtil.isFileTooOld(monitoredFile, getDetachTime())) {
+          LOG.info("'{}' file is too old. No new thread start needed.", 
monitoredFile.getAbsolutePath());
+        } else {
+          getInputFile().startNewChildInputFileThread(entry);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index e7e0eef..834cf7e 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -48,7 +48,7 @@ public class OutputManagerImpl extends OutputManager {
 
   private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
 
-  private List<Output> outputs = new ArrayList<Output>();
+  private List<Output> outputs = new ArrayList<>();
 
   private boolean addMessageMD5 = true;
 
@@ -100,24 +100,7 @@ public class OutputManagerImpl extends OutputManager {
 
     // TODO: Ideally most of the overrides should be configurable
 
-    if (jsonObj.get("type") == null) {
-      jsonObj.put("type", input.getInputDescriptor().getType());
-    }
-    if (input.getClass().isAssignableFrom(InputFile.class)) { // TODO: find 
better solution
-      InputFile inputFile = (InputFile) input;
-      if (jsonObj.get("path") == null && inputFile.getFilePath() != null) {
-        jsonObj.put("path", inputFile.getFilePath());
-      }
-    }
-    if (jsonObj.get("path") == null && input.getInputDescriptor().getPath() != 
null) {
-      jsonObj.put("path", input.getInputDescriptor().getPath());
-    }
-    if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
-      jsonObj.put("host", LogFeederUtil.hostName);
-    }
-    if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
-      jsonObj.put("ip", LogFeederUtil.ipAddress);
-    }
+    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
     jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
 
     if (input.isUseEventMD5() || input.isGenEventMD5()) {
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index 8ade992..cc6b1e0 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,25 +21,45 @@ package org.apache.ambari.logfeeder.util;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.log4j.Logger;
+import org.apache.commons.io.FileUtils;
+import org.apache.tools.ant.DirectoryScanner;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FileUtil {
-  private static final Logger LOG = Logger.getLogger(FileUtil.class);
-  
+  private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class);
+  private static final String FOLDER_SEPARATOR = "/";
+
   private FileUtil() {
     throw new UnsupportedOperationException();
   }
 
+  public static List<File> getAllFileFromDir(File directory, String extension, 
boolean checkInSubDir) {
+    if (!directory.exists()) {
+      LOG.error(directory.getAbsolutePath() + " is not exists ");
+    } else if (!directory.isDirectory()) {
+      LOG.error(directory.getAbsolutePath() + " is not Directory ");
+    } else {
+      return (List<File>) FileUtils.listFiles(directory, new 
String[]{extension}, checkInSubDir);
+    }
+    return new ArrayList<File>();
+  }
+
+
   public static Object getFileKey(File file) {
     try {
       Path fileFullPath = Paths.get(file.getAbsolutePath());
@@ -52,20 +72,111 @@ public class FileUtil {
     }
     return file.toString();
   }
-  
-  public static HashMap<String, Object> getJsonFileContentFromClassPath(String 
fileName) {
+
+  public static File getFileFromClasspath(String filename) {
+    URL fileCompleteUrl = 
Thread.currentThread().getContextClassLoader().getResource(filename);
+    LOG.debug("File Complete URI :" + fileCompleteUrl);
+    File file = null;
+    try {
+      file = new File(fileCompleteUrl.toURI());
+    } catch (Exception exception) {
+      LOG.debug(exception.getMessage(), exception.getCause());
+    }
+    return file;
+  }
+
+  public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
     ObjectMapper mapper = new ObjectMapper();
-    try (InputStream inputStream = 
FileUtil.class.getClassLoader().getResourceAsStream(fileName)) {
-      return mapper.readValue(inputStream, new TypeReference<HashMap<String, 
Object>>() {});
+    try {
+      HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new 
TypeReference<HashMap<String, Object>>() {});
+      return jsonmap;
     } catch (IOException e) {
-      LOG.error(e, e.getCause());
+      LOG.error("{}", e);
     }
     return new HashMap<String, Object>();
   }
-  
+
+  public static File[] getInputFilesByPattern(String searchPath) {
+    File searchFile = new File(searchPath);
+    if (searchFile.isFile()) {
+      return new File[]{searchFile};
+    } else {
+      if (searchPath.contains("*")) {
+        try {
+          String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
+          String fileNameAfterLastFolder = 
searchPath.substring(folderBeforeRegex.length());
+
+          DirectoryScanner scanner = new DirectoryScanner();
+          scanner.setIncludes(new String[]{fileNameAfterLastFolder});
+          scanner.setBasedir(folderBeforeRegex);
+          scanner.setCaseSensitive(true);
+          scanner.scan();
+          String[] fileNames = scanner.getIncludedFiles();
+
+          if (fileNames != null && fileNames.length > 0) {
+            File[] files = new File[fileNames.length];
+            for (int i = 0; i < fileNames.length; i++) {
+              files[i] = new File(folderBeforeRegex + fileNames[i]);
+            }
+            return files;
+          }
+        } catch (Exception e) {
+          LOG.warn("Input file not found by pattern (exception thrown); {}, 
message: {}", searchPath, e.getMessage());
+        }
+
+      } else {
+        LOG.warn("Input file config not found by pattern; {}", searchPath);
+      }
+      return new File[]{};
+    }
+  }
+
+  public static Map<String, List<File>> getFoldersForFiles(File[] inputFiles) {
+    Map<String, List<File>> foldersMap = new HashMap<>();
+    if (inputFiles != null && inputFiles.length > 0) {
+      for (File inputFile : inputFiles) {
+        File folder = inputFile.getParentFile();
+        if (folder.exists()) {
+          if (foldersMap.containsKey(folder.getAbsolutePath())) {
+            foldersMap.get(folder.getAbsolutePath()).add(inputFile);
+          } else {
+            List<File> fileList = new ArrayList<>();
+            fileList.add(inputFile);
+            foldersMap.put(folder.getAbsolutePath(), fileList);
+          }
+        }
+      }
+    }
+    if (!foldersMap.isEmpty()) {
+      for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) {
+        Collections.sort(entry.getValue(), Collections.reverseOrder());
+      }
+    }
+    return foldersMap;
+  }
+
+  private static String getLogDirNameBeforeWildCard(String pattern) {
+    String[] splitByFirstRegex = pattern.split("\\*");
+    String beforeRegex = splitByFirstRegex[0];
+    if (beforeRegex.contains(FOLDER_SEPARATOR)) {
+      int endIndex = beforeRegex.lastIndexOf(FOLDER_SEPARATOR);
+      String parentFolder = beforeRegex;
+      if (endIndex != -1) {
+        parentFolder = beforeRegex.substring(0, endIndex) + FOLDER_SEPARATOR;
+      }
+      return parentFolder;
+    } else {
+      return beforeRegex;
+    }
+  }
+
   public static void move(File source, File target) throws IOException {
     Path sourcePath = Paths.get(source.getAbsolutePath());
     Path targetPath = Paths.get(target.getAbsolutePath());
     Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
   }
-}
+
+  public static boolean isFileTooOld(File file, long diffMin) {
+    return (System.currentTimeMillis() - file.lastModified()) > diffMin * 1000 
* 60;
+  }
+}
\ No newline at end of file
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 6fd00a4..9b0b0e8 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -21,7 +21,9 @@ package org.apache.ambari.logfeeder.util;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
+import org.apache.ambari.logfeeder.input.InputFile;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -137,4 +139,26 @@ public class LogFeederUtil {
       return false;
     }
   }
+
+  public static void fillMapWithFieldDefaults(Map<String, Object> jsonObj, 
InputMarker inputMarker, boolean force) {
+    if (inputMarker != null && inputMarker.getInput() != null && (force || 
inputMarker.getInput().isInitDefaultFields())) {
+      if (jsonObj.get("type") == null) {
+        jsonObj.put("type", 
inputMarker.getInput().getInputDescriptor().getType());
+      }
+      if (inputMarker.getInput() instanceof InputFile) {
+        if (jsonObj.get("path") == null && 
((InputFile)inputMarker.getInput()).getFilePath() != null) {
+          jsonObj.put("path", 
((InputFile)inputMarker.getInput()).getFilePath());
+        }
+      }
+      if (jsonObj.get("path") == null && 
inputMarker.getInput().getInputDescriptor().getPath() != null) {
+        jsonObj.put("path", 
inputMarker.getInput().getInputDescriptor().getPath());
+      }
+      if (jsonObj.get("host") == null && hostName != null) {
+        jsonObj.put("host", hostName);
+      }
+      if (jsonObj.get("ip") == null && ipAddress != null) {
+        jsonObj.put("ip", ipAddress);
+      }
+    }
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
index 6de9f4c..215c572 100644
--- 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
+++ 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
@@ -41,6 +41,12 @@ public class LSServerFilterGrok extends LSServerFilter {
   @JsonProperty("message_pattern")
   private String messagePattern;
 
+  @JsonProperty
+  private boolean skipOnError;
+
+  @JsonProperty
+  private boolean deepExtract;
+
   public LSServerFilterGrok() {}
 
   public LSServerFilterGrok(FilterDescriptor filterDescriptor) {
@@ -50,6 +56,8 @@ public class LSServerFilterGrok extends LSServerFilter {
       this.log4jFormat = filterGrokDescriptor.getLog4jFormat();
       this.multilinePattern = filterGrokDescriptor.getMultilinePattern();
       this.messagePattern = filterGrokDescriptor.getMessagePattern();
+      this.skipOnError = filterGrokDescriptor.isSkipOnError();
+      this.deepExtract = filterGrokDescriptor.isDeepExtract();
     }
   }
 
@@ -76,4 +84,20 @@ public class LSServerFilterGrok extends LSServerFilter {
   public void setMessagePattern(String messagePattern) {
     this.messagePattern = messagePattern;
   }
+
+  public boolean isSkipOnError() {
+    return skipOnError;
+  }
+
+  public void setSkipOnError(boolean skipOnError) {
+    this.skipOnError = skipOnError;
+  }
+
+  public boolean isDeepExtract() {
+    return deepExtract;
+  }
+
+  public void setDeepExtract(boolean deepExtract) {
+    this.deepExtract = deepExtract;
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
index 6ef3d3f..af28f17 100644
--- 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
+++ 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
@@ -70,6 +70,9 @@ public abstract class LSServerInput {
   
   @JsonProperty("is_enabled")
   private Boolean isEnabled;
+
+  @JsonProperty("init_default_fields")
+  private Boolean initDefaultFields;
   
   public LSServerInput() {}
   
@@ -88,6 +91,7 @@ public abstract class LSServerInput {
     this.cacheSize = inputDescriptor.getCacheSize();
     this.cacheDedupInterval = inputDescriptor.getCacheDedupInterval();
     this.isEnabled = inputDescriptor.isEnabled();
+    this.initDefaultFields = inputDescriptor.isInitDefaultFields();
   }
 
   public String getType() {
@@ -145,4 +149,8 @@ public abstract class LSServerInput {
   public Boolean getIsEnabled() {
     return isEnabled;
   }
+
+  public Boolean getInitDefaultFields() {
+    return initDefaultFields;
+  }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
index bb2a49c..efa56a2 100644
--- 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
+++ 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
@@ -19,15 +19,67 @@
 
 package org.apache.ambari.logsearch.model.common;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
 
 import io.swagger.annotations.ApiModel;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 
 @ApiModel
 public class LSServerInputFile extends LSServerInputFileBase {
+
+  @JsonProperty("detach_interval_min")
+  private Integer detachIntervalMin;
+
+  @JsonProperty("detach_time_min")
+  private Integer detachTimeMin;
+
+  @JsonProperty("path_update_interval_min")
+  private Integer pathUpdateIntervalMin;
+
+  @JsonProperty("max_age_min")
+  private Integer maxAgeMin;
+
   public LSServerInputFile() {}
 
   public LSServerInputFile(InputDescriptor inputDescriptor) {
     super(inputDescriptor);
+    InputFileDescriptor inputFileDescriptor = 
(InputFileDescriptor)inputDescriptor;
+    this.detachIntervalMin = inputFileDescriptor.getDetachIntervalMin();
+    this.detachTimeMin = inputFileDescriptor.getDetachTimeMin();
+    this.pathUpdateIntervalMin = 
inputFileDescriptor.getPathUpdateIntervalMin();
+    this.maxAgeMin = inputFileDescriptor.getMaxAgeMin();
+  }
+
+  public Integer getDetachIntervalMin() {
+    return detachIntervalMin;
+  }
+
+  public void setDetachIntervalMin(Integer detachIntervalMin) {
+    this.detachIntervalMin = detachIntervalMin;
+  }
+
+  public Integer getDetachTimeMin() {
+    return detachTimeMin;
+  }
+
+  public void setDetachTimeMin(Integer detachTimeMin) {
+    this.detachTimeMin = detachTimeMin;
+  }
+
+  public Integer getPathUpdateIntervalMin() {
+    return pathUpdateIntervalMin;
+  }
+
+  public void setPathUpdateIntervalMin(Integer pathUpdateIntervalMin) {
+    this.pathUpdateIntervalMin = pathUpdateIntervalMin;
+  }
+
+  public Integer getMaxAgeMin() {
+    return maxAgeMin;
+  }
+
+  public void setMaxAgeMin(Integer maxAgeMin) {
+    this.maxAgeMin = maxAgeMin;
   }
 }
diff --git 
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
 
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
index e3f6f12..081e16e 100644
--- 
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
+++ 
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
@@ -1,591 +1,376 @@
 {
-  "input":[
+  "input": [
     {
-      "type":"ambari_agent",
-      "rowtype":"service",
-      "path":"/root/test-logs/ambari-server/ambari-agent.log",
-      "group": "Ambari"
-    },
-    {
-      "type":"ambari_server",
-      "rowtype":"service",
-      "path":"/root/test-logs/ambari-server/ambari-server.log",
-      "group": "Ambari"
-    },
-    {
-      "type":"ambari_alerts",
-      "rowtype":"service",
-      "add_fields":{
-        "level":"INFO"
-      },
-      "path":"/root/test-logs/ambari-server/ambari-alerts.log",
-      "group": "Ambari"
-    },
-    {
-      "type":"ambari_config_changes",
-      "rowtype":"service",
-      "path":"/root/test-logs/ambari-server/ambari-config-changes.log",
-      "group": "Ambari"
-    },
-    {
-      "type":"ambari_eclipselink",
-      "rowtype":"service",
-      "path":"/root/test-logs/ambari-server/ambari-eclipselink.log",
-      "group": "Ambari"
-    },
-    {
-      "type":"ambari_server_check_database",
-      "rowtype":"service",
-      "path":"/root/test-logs/ambari-server/ambari-server-check-database.log",
-      "group": "Ambari"
-    },
-    {
-      "type":"ambari_audit",
-      "rowtype":"audit",
-      "add_fields":{
-        "logType":"AmbariAudit",
-        "enforcer":"ambari-acl",
-        "repoType":"1",
-        "repo":"ambari",
-        "level":"INFO"
+      "type": "ambari_audit",
+      "rowtype": "audit",
+      "add_fields": {
+        "logType": "AmbariAudit",
+        "enforcer": "ambari-acl",
+        "repoType": "1",
+        "repo": "ambari",
+        "level": "INFO"
       },
-      "path":"/root/test-logs/ambari-server/ambari-audit.log"
+      "path": "/root/test-logs/ambari-server/ambari-audit.log"
     }
-
   ],
-  "filter":[
-    {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
-            "ambari_agent"
-          ]
-
-        }
-
-      },
-      "log4j_format":"",
-      "multiline_pattern":"^(%{LOGLEVEL:level} %{TIMESTAMP_ISO8601:logtime})",
-      "message_pattern":"(?m)^%{LOGLEVEL:level} %{TIMESTAMP_ISO8601:logtime} 
%{JAVAFILE:file}:%{INT:line_number} - %{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "logtime":{
-          "map_date":{
-            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
-          }
-
-        },
-        "level":{
-          "map_fieldvalue":{
-            "pre_value":"WARNING",
-            "post_value":"WARN"
-          }
-
-        }
-
-      }
-
-    },
-    {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
-            "ambari_server"
-          ]
-          
-        }
-        
-      },
-      "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
-      "multiline_pattern":"^(%{USER_SYNC_DATE:logtime})",
-      
"message_pattern":"(?m)^%{USER_SYNC_DATE:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{JAVACLASS:logger_name}:%{INT:line_number}%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "logtime":{
-          "map_date":{
-            "target_date_pattern":"dd MMM yyyy HH:mm:ss"
-          }
-
-        }
-
-      }
-
-    },
-    {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
-            "ambari_alerts"
-          ]
-          
-        }
-        
-      },
-      "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
-      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "logtime":{
-          "map_date":{
-            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
-          }
-
-        }
-
-      }
-
-    },
-    {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
-            "ambari_config_changes"
-          ]
-          
-        }
-        
-      },
-      "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
-      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "logtime":{
-          "map_date":{
-            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
-          }
-
-        }
-
-      }
-
-    },
+  "filter": [
     {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
-            "ambari_eclipselink"
-          ]
-          
-        }
-        
-      },
-      "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
-      "multiline_pattern":"^(\\[EL%{SPACE}%{LOGLEVEL:level}\\])",
-      
"message_pattern":"(?m)^\\[EL%{SPACE}%{LOGLEVEL:level}\\]:%{SPACE}%{TIMESTAMP_ISO8601:logtime}%{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "logtime":{
-          "map_date":{
-            "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS"
-          }
-
-        },
-        "level":{
-          "map_fieldvalue":{
-            "pre_value":"Warning",
-            "post_value":"Warn"
-          }
-
-        }
-
-      }
-
-    },
-    {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
-            "ambari_server_check_database"
-          ]
-          
-        }
-        
-      },
-      "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
-      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "logtime":{
-          "map_date":{
-            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
-          }
-
-        }
-
-      }
-
-    },
-    {
-      "filter":"grok",
-      "conditions":{
-        "fields":{
-          "type":[
+      "filter": "grok",
+      "conditions": {
+        "fields": {
+          "type": [
             "ambari_audit"
           ]
-
         }
-
       },
-      "log4j_format":"%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n",
-      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:evtTime})",
-      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:evtTime},%{SPACE}%{GREEDYDATA:log_message}",
-      "post_map_values":{
-        "evtTime":{
-          "map_date":{
-            "target_date_pattern":"yyyy-MM-dd'T'HH:mm:ss.SSSXX"
+      "log4j_format": "%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n",
+      "multiline_pattern": "^(%{TIMESTAMP_ISO8601:evtTime})",
+      "message_pattern": 
"(?m)^%{TIMESTAMP_ISO8601:evtTime},%{SPACE}%{GREEDYDATA:log_message}",
+      "post_map_values": {
+        "evtTime": {
+          "map_date": {
+            "target_date_pattern": "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
           }
-
         }
-
       }
-
     },
     {
-      "filter":"keyvalue",
-      "sort_order":1,
-      "conditions":{
-        "fields":{
-          "type":[
+      "filter": "keyvalue",
+      "sort_order": 1,
+      "conditions": {
+        "fields": {
+          "type": [
             "ambari_audit"
           ]
-
         }
-
       },
-      "source_field":"log_message",
-      "field_split":", ",
-      "value_borders":"()",
-      "post_map_values":{
-        "User":{
-          "map_fieldvalue":{
-            "pre_value":"null",
-            "post_value":"unknown"
-          },
-          "map_fieldname":{
-            "new_fieldname":"reqUser"
+      "source_field": "log_message",
+      "field_split": ", ",
+      "value_borders": "()",
+      "post_map_values": {
+        "User": {
+          "map_fieldvalue": {
+            "pre_value": "null",
+            "post_value": "unknown"
+          },
+          "map_fieldname": {
+            "new_fieldname": "reqUser"
           }
         },
-        "Hostname":{
-          "map_fieldname":{
-            "new_fieldname":"host"
+        "Hostname": {
+          "map_fieldname": {
+            "new_fieldname": "host"
           }
         },
-        "Host name":{
-          "map_fieldname":{
-            "new_fieldname":"host"
+        "Host name": {
+          "map_fieldname": {
+            "new_fieldname": "host"
           }
         },
-        "RemoteIp":{
-          "map_fieldname":{
-            "new_fieldname":"cliIP"
+        "RemoteIp": {
+          "map_fieldname": {
+            "new_fieldname": "cliIP"
           }
         },
-        "RequestType":{
-          "map_fieldname":{
-            "new_fieldname":"cliType"
+        "RequestType": {
+          "map_fieldname": {
+            "new_fieldname": "cliType"
           }
         },
-        "Operation":{
-          "map_fieldname":{
-            "new_fieldname":"action"
+        "Operation": {
+          "map_fieldname": {
+            "new_fieldname": "action"
           }
         },
-        "url":{
-          "map_fieldname":{
-            "new_fieldname":"resource"
+        "url": {
+          "map_fieldname": {
+            "new_fieldname": "resource"
           }
         },
-        "ResourcePath":{
-          "map_fieldname":{
-            "new_fieldname":"resource"
+        "ResourcePath": {
+          "map_fieldname": {
+            "new_fieldname": "resource"
           }
         },
-        "Cluster name":{
-          "map_fieldname":{
-            "new_fieldname":"cluster"
+        "Cluster name": {
+          "map_fieldname": {
+            "new_fieldname": "cluster"
           }
         },
-        "Reason":{
-          "map_fieldname":{
-            "new_fieldname":"reason"
+        "Reason": {
+          "map_fieldname": {
+            "new_fieldname": "reason"
           }
         },
-        "Base URL":{
-          "map_fieldname":{
-            "new_fieldname":"ws_base_url"
+        "Base URL": {
+          "map_fieldname": {
+            "new_fieldname": "ws_base_url"
           }
         },
-        "Command":{
-          "map_fieldvalue":{
-            "pre_value":"null",
-            "post_value":"unknown"
+        "Command": {
+          "map_fieldvalue": {
+            "pre_value": "null",
+            "post_value": "unknown"
           },
-          "map_fieldname":{
-            "new_fieldname":"ws_command"
+          "map_fieldname": {
+            "new_fieldname": "ws_command"
           }
         },
-        "Component":{
-          "map_fieldname":{
-            "new_fieldname":"ws_component"
+        "Component": {
+          "map_fieldname": {
+            "new_fieldname": "ws_component"
           }
         },
-        "Details":{
-          "map_fieldname":{
-            "new_fieldname":"ws_details"
+        "Details": {
+          "map_fieldname": {
+            "new_fieldname": "ws_details"
           }
         },
-        "Display name":{
-          "map_fieldvalue":{
-            "pre_value":"null",
-            "post_value":"unknown"
+        "Display name": {
+          "map_fieldvalue": {
+            "pre_value": "null",
+            "post_value": "unknown"
           },
-          "map_fieldname":{
-            "new_fieldname":"ws_display_name"
+          "map_fieldname": {
+            "new_fieldname": "ws_display_name"
           }
         },
-        "OS":{
-          "map_fieldname":{
-            "new_fieldname":"ws_os"
+        "OS": {
+          "map_fieldname": {
+            "new_fieldname": "ws_os"
           }
         },
-        "Repo id":{
-          "map_fieldname":{
-            "new_fieldname":"ws_repo_id"
+        "Repo id": {
+          "map_fieldname": {
+            "new_fieldname": "ws_repo_id"
           }
         },
-        "Repo version":{
-          "map_fieldvalue":{
-            "pre_value":"null",
-            "post_value":"unknown"
+        "Repo version": {
+          "map_fieldvalue": {
+            "pre_value": "null",
+            "post_value": "unknown"
           },
-          "map_fieldname":{
-            "new_fieldname":"ws_repo_version"
+          "map_fieldname": {
+            "new_fieldname": "ws_repo_version"
           }
         },
-        "Repositories":{
-          "map_fieldname":{
-            "new_fieldname":"ws_repositories"
+        "Repositories": {
+          "map_fieldname": {
+            "new_fieldname": "ws_repositories"
           }
         },
-        "RequestId":{
-          "map_fieldname":{
-            "new_fieldname":"ws_request_id"
+        "RequestId": {
+          "map_fieldname": {
+            "new_fieldname": "ws_request_id"
           }
         },
-        "Roles":{
-          "map_fieldname":{
-            "new_fieldname":"ws_roles"
+        "Roles": {
+          "map_fieldname": {
+            "new_fieldname": "ws_roles"
           }
         },
-        "Stack":{
-          "map_fieldname":{
-            "new_fieldname":"ws_stack"
+        "Stack": {
+          "map_fieldname": {
+            "new_fieldname": "ws_stack"
           }
         },
-        "Stack version":{
-          "map_fieldname":{
-            "new_fieldname":"ws_stack_version"
+        "Stack version": {
+          "map_fieldname": {
+            "new_fieldname": "ws_stack_version"
           }
         },
-        "TaskId":{
-          "map_fieldname":{
-            "new_fieldname":"ws_task_id"
+        "TaskId": {
+          "map_fieldname": {
+            "new_fieldname": "ws_task_id"
           }
         },
-        "VersionNote":{
-          "map_fieldvalue":{
-            "pre_value":"null",
-            "post_value":"unknown"
+        "VersionNote": {
+          "map_fieldvalue": {
+            "pre_value": "null",
+            "post_value": "unknown"
           },
-          "map_fieldname":{
-            "new_fieldname":"ws_version_note"
+          "map_fieldname": {
+            "new_fieldname": "ws_version_note"
           }
         },
-        "VersionNumber":{
-          "map_fieldvalue":{
-            "pre_value":"null",
-            "post_value":"unknown"
+        "VersionNumber": {
+          "map_fieldvalue": {
+            "pre_value": "null",
+            "post_value": "unknown"
           },
-          "map_fieldname":{
-            "new_fieldname":"ws_version_number"
+          "map_fieldname": {
+            "new_fieldname": "ws_version_number"
           }
         },
-        "Status":[
-         {
-           "map_fieldcopy":{
-             "copy_name": "ws_status"
-           }
-         },
-         {
-            "map_fieldvalue":{
-              "pre_value":"Success",
-              "post_value":"1"
+        "Status": [
+          {
+            "map_fieldcopy": {
+              "copy_name": "ws_status"
+            }
+          },
+          {
+            "map_fieldvalue": {
+              "pre_value": "Success",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"Successfully queued",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "Successfully queued",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"QUEUED",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "QUEUED",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"PENDING",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "PENDING",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"COMPLETED",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "COMPLETED",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"IN_PROGRESS",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "IN_PROGRESS",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"Failed",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "Failed",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"Failed to queue",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "Failed to queue",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"HOLDING",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "HOLDING",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"HOLDING_FAILED",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "HOLDING_FAILED",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"HOLDING_TIMEDOUT",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "HOLDING_TIMEDOUT",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"FAILED",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "FAILED",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"TIMEDOUT",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "TIMEDOUT",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"ABORTED",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "ABORTED",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"SKIPPED_FAILED",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "SKIPPED_FAILED",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldname":{
-              "new_fieldname":"result"
+            "map_fieldname": {
+              "new_fieldname": "result"
             }
           }
         ],
-        "ResultStatus":[
+        "ResultStatus": [
           {
-            "map_fieldcopy":{
+            "map_fieldcopy": {
               "copy_name": "ws_result_status"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"200 OK",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "200 OK",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"201 Created",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "201 Created",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"202 Accepted",
-              "post_value":"1"
+            "map_fieldvalue": {
+              "pre_value": "202 Accepted",
+              "post_value": "1"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"400 Bad Request",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "400 Bad Request",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"401 Unauthorized",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "401 Unauthorized",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"403 Forbidden",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "403 Forbidden",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"404 Not Found",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "404 Not Found",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"409 Resource Conflict",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "409 Resource Conflict",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldvalue":{
-              "pre_value":"500 Internal Server Error",
-              "post_value":"0"
+            "map_fieldvalue": {
+              "pre_value": "500 Internal Server Error",
+              "post_value": "0"
             }
           },
           {
-            "map_fieldname":{
-              "new_fieldname":"result"
+            "map_fieldname": {
+              "new_fieldname": "result"
             }
           }
         ]
-
       }
-
     }
-
   ]
-
 }
diff --git 
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
 
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
new file mode 100644
index 0000000..68e6fcf
--- /dev/null
+++ 
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
@@ -0,0 +1,75 @@
+{
+  "input":[
+    {
+      "type":"storm_worker",
+      "rowtype":"service",
+      "path":"/root/test-logs/storm/worker-logs/*/*/worker.log",
+      "init_default_fields": "true"
+    }
+  ],
+  "filter":[
+    {
+      "filter":"grok",
+      "sort_order": 1,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "log4j_format":"",
+      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
+      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{JAVACLASS:logger_name}\\s%{GREEDYDATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}",
+      "post_map_values":{
+        "logtime":{
+          "map_date":{
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS"
+          }
+        }
+      }
+    },
+    {
+      "filter":"grok",
+      "sort_order": 2,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "source_field": "thread_name",
+      "remove_source_field": "false",
+      
"message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:sdi_storm_component_name}\\-executor%{DATA}|%{DATA:thread_name})"
+    },
+    {
+      "filter":"grok",
+      "sort_order": 3,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "source_field": "path",
+      "remove_source_field": "false",
+      
"message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:sdi_storm_topology_id}/%{DATA:sdi_storm_worker_port}/worker\\.log"
+    },
+    {
+      "filter":"grok",
+      "sort_order": 4,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "source_field": "sdi_storm_topology_id",
+      "remove_source_field": "false",
+      
"message_pattern":"(streamline\\-%{DATA:sdi_streamline_topology_id}\\-%{DATA:sdi_streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{DATA:sdi_storm_topology_id})"
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6700/worker.log
 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6700/worker.log
new file mode 100644
index 0000000..b6a59ec
--- /dev/null
+++ 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6700/worker.log
@@ -0,0 +1,5 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[7 8] 
[INFO] Preparing bolt __acker:(1)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[7 8] 
[WARN] Prepared bolt __acker:(2)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier 
Thread-7-8-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing 
transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 
3-NOTIFICATION:[9 1]
+2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[7 8] 
[WARN] Prepared bolt __acker:(3)
diff --git 
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
new file mode 100644
index 0000000..5f2d20e
--- /dev/null
+++ 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
@@ -0,0 +1,5 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[INFO] Preparing bolt __acker:(4)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[WARN] Prepared bolt __acker:(5)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier 
Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing 
transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 
3-NOTIFICATION:[3 1]
+2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[WARN] Prepared bolt __acker:(6)

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to