Repository: ambari
Updated Branches:
  refs/heads/branch-2.6 973ec4640 -> b68b2ea53


AMBARI-22600. LogFeeder: filters for wildcard input paths need to be cloned 
(oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b68b2ea5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b68b2ea5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b68b2ea5

Branch: refs/heads/branch-2.6
Commit: b68b2ea53e48fa351070e9253ce1f240c46458c7
Parents: 973ec46
Author: Oliver Szabo <[email protected]>
Authored: Tue Dec 5 20:50:13 2017 +0100
Committer: Oliver Szabo <[email protected]>
Committed: Wed Dec 6 16:13:12 2017 +0100

----------------------------------------------------------------------
 .../apache/ambari/logfeeder/filter/Filter.java  |  8 ++++-
 .../ambari/logfeeder/filter/FilterGrok.java     |  7 +++--
 .../apache/ambari/logfeeder/input/Input.java    | 33 ++++++++++++++++++++
 .../shipper-conf/input.config-storm.json        | 10 +++---
 .../streamline-1-TestAgg-2-3/6701/worker.log    | 10 ++++--
 .../streamline-2-TestAgg2-4-5/6700/worker.log   |  9 ++++++
 6 files changed, 66 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index afd903e..bd760f7 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -36,7 +36,8 @@ import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 
-public abstract class Filter extends ConfigBlock {
+public abstract class Filter extends ConfigBlock implements Cloneable {
+
   private static final Logger LOG = Logger.getLogger(Filter.class);
 
   protected Input input;
@@ -55,6 +56,11 @@ public abstract class Filter extends ConfigBlock {
     }
   }
 
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+
   @SuppressWarnings("unchecked")
   private void initializePostMapValues() {
     Map<String, Object> postMapValues = (Map<String, Object>) 
getConfigValue("post_map_values");

http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 49d7e76..e0ae4f0 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -62,6 +62,7 @@ public class FilterGrok extends Filter {
 
   private String sourceField = null;
   private boolean removeSourceField = true;
+  private boolean skipOnError = false;
 
   private Set<String> namedParamList = new HashSet<String>();
   private Set<String> multiLineamedParamList = new HashSet<String>();
@@ -80,6 +81,8 @@ public class FilterGrok extends Filter {
       sourceField = getStringValue("source_field");
       removeSourceField = getBooleanValue("remove_source_field",
         removeSourceField);
+      skipOnError = getBooleanValue("skip_on_error", false);
+
 
       LOG.info("init() done. grokPattern=" + messagePattern + ", 
multilinePattern=" + multilinePattern + ", " +
       getShortDescription());
@@ -181,7 +184,7 @@ public class FilterGrok extends Filter {
 
     if (grokMultiline != null) {
       String jsonStr = grokMultiline.capture(inputStr);
-      if (!"{}".equals(jsonStr)) {
+      if (!"{}".equals(jsonStr) || skipOnError) {
         if (strBuff != null) {
           Map<String, Object> jsonObj = Collections.synchronizedMap(new 
HashMap<String, Object>());
           try {
@@ -226,7 +229,7 @@ public class FilterGrok extends Filter {
     String jsonStr = grokMessage.capture(inputStr);
 
     boolean parseError = false;
-    if ("{}".equals(jsonStr)) {
+    if ("{}".equals(jsonStr) && !skipOnError) {
       parseError = true;
       logParseError(inputStr);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 7df0b6e..3808605 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -137,6 +137,10 @@ public abstract class Input extends ConfigBlock implements 
Runnable, Cloneable {
     }
   }
 
+  public void setFirstFilter(Filter firstFilter) {
+    this.firstFilter = firstFilter;
+  }
+
   public void addOutput(Output output) {
     outputList.add(output);
   }
@@ -200,6 +204,7 @@ public abstract class Input extends ConfigBlock implements 
Runnable, Cloneable {
       clonedObject.setLogFileDetacherThread(null);
       clonedObject.setLogFilePathUpdaterThread(null);
       clonedObject.setInputChildMap(new HashMap<String, InputFile>());
+      copyFilters(clonedObject, firstFilter);
       Thread thread = new Thread(threadGroup, clonedObject, "file=" + 
fullPathWithWildCard);
       clonedObject.setThread(thread);
       inputChildMap.put(fullPathWithWildCard, clonedObject);
@@ -207,6 +212,34 @@ public abstract class Input extends ConfigBlock implements 
Runnable, Cloneable {
     }
   }
 
+  private void copyFilters(InputFile clonedInput, Filter firstFilter) {
+    if (firstFilter != null) {
+      try {
+        LOG.info("Cloning filters for input=" + clonedInput.logPath);
+        Filter newFilter = (Filter) firstFilter.clone();
+        newFilter.setInput(clonedInput);
+        clonedInput.setFirstFilter(newFilter);
+        Filter actFilter = firstFilter;
+        Filter actClonedFilter = newFilter;
+        while (actFilter != null) {
+          if (actFilter.getNextFilter() != null) {
+            actFilter = actFilter.getNextFilter();
+            Filter newClonedFilter = (Filter) actFilter.clone();
+            newClonedFilter.setInput(clonedInput);
+            actClonedFilter.setNextFilter(newClonedFilter);
+            actClonedFilter = newClonedFilter;
+          } else {
+            actClonedFilter.setNextFilter(null);
+            actFilter = null;
+          }
+        }
+        LOG.info("Cloning filters has finished for input=" + 
clonedInput.logPath);
+      } catch (Exception e) {
+        LOG.error("Could not clone filters for input=" + clonedInput.logPath);
+      }
+    }
+  }
+
   public void stopChildInputFileThread(String folderPathKey) {
     LOG.info("Stop child input thread - " + folderPathKey);
     String filePath = new File(getFilePath()).getName();

http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
 
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
index 34bdcf0..68e6fcf 100644
--- 
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
+++ 
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
@@ -20,7 +20,7 @@
       },
       "log4j_format":"",
       "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}\\s%{JAVACLASS:logger_name}\\s%{DATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}",
+      
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{JAVACLASS:logger_name}\\s%{GREEDYDATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}",
       "post_map_values":{
         "logtime":{
           "map_date":{
@@ -41,7 +41,7 @@
       },
       "source_field": "thread_name",
       "remove_source_field": "false",
-      
"message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:storm_component_name}\\-executor%{DATA}|%{DATA})"
+      
"message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:sdi_storm_component_name}\\-executor%{DATA}|%{DATA:thread_name})"
     },
     {
       "filter":"grok",
@@ -55,7 +55,7 @@
       },
       "source_field": "path",
       "remove_source_field": "false",
-      
"message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:storm_topology_id}/%{DATA:storm_worker_port}/worker\\.log"
+      
"message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:sdi_storm_topology_id}/%{DATA:sdi_storm_worker_port}/worker\\.log"
     },
     {
       "filter":"grok",
@@ -67,9 +67,9 @@
           ]
         }
       },
-      "source_field": "storm_topology_id",
+      "source_field": "sdi_storm_topology_id",
       "remove_source_field": "false",
-      
"message_pattern":"(streamline\\-%{DATA:streamline_topology_id}\\-%{DATA:streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{GREEDYDATA})"
+      
"message_pattern":"(streamline\\-%{DATA:sdi_streamline_topology_id}\\-%{DATA:sdi_streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{DATA:sdi_storm_topology_id})"
     }
   ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
index 6a10ad9..787b9ee 100644
--- 
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
+++ 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
@@ -1,4 +1,8 @@
-2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[INFO] Preparing bolt __acker:(5)
-2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[INFO] Prepared bolt __acker:(5)
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[INFO] Preparing bolt __acker:(4)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[WARN] Prepared bolt __acker:(5)
 2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier 
Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing 
transport
-2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 
3-NOTIFICATION:[3 3]
\ No newline at end of file
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 
3-NOTIFICATION:[3 1]
+2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[5 5] 
[WARN] Prepared bolt __acker:(6)
+
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log
 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log
new file mode 100644
index 0000000..248d6a4
--- /dev/null
+++ 
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log
@@ -0,0 +1,9 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[7 8] 
[INFO] Preparing bolt __acker:(1)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[7 8] 
[WARN] Prepared bolt __acker:(2)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier 
Thread-7-8-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing 
transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 
3-NOTIFICATION:[9 1]
+2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[7 8] 
[WARN] Prepared bolt __acker:(3)
+
+
+
+

Reply via email to