Repository: ambari Updated Branches: refs/heads/branch-2.6 973ec4640 -> b68b2ea53
AMBARI-22600. LogFeeder: filters for wildcard input paths need to be cloned (oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b68b2ea5 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b68b2ea5 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b68b2ea5 Branch: refs/heads/branch-2.6 Commit: b68b2ea53e48fa351070e9253ce1f240c46458c7 Parents: 973ec46 Author: Oliver Szabo <[email protected]> Authored: Tue Dec 5 20:50:13 2017 +0100 Committer: Oliver Szabo <[email protected]> Committed: Wed Dec 6 16:13:12 2017 +0100 ---------------------------------------------------------------------- .../apache/ambari/logfeeder/filter/Filter.java | 8 ++++- .../ambari/logfeeder/filter/FilterGrok.java | 7 +++-- .../apache/ambari/logfeeder/input/Input.java | 33 ++++++++++++++++++++ .../shipper-conf/input.config-storm.json | 10 +++--- .../streamline-1-TestAgg-2-3/6701/worker.log | 10 ++++-- .../streamline-2-TestAgg2-4-5/6700/worker.log | 9 ++++++ 6 files changed, 66 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index afd903e..bd760f7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -36,7 +36,8 @@ import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.log4j.Logger; import org.apache.log4j.Priority; -public abstract class Filter extends ConfigBlock { +public abstract class Filter extends ConfigBlock implements Cloneable { + private static final Logger LOG = Logger.getLogger(Filter.class); protected Input input; @@ -55,6 +56,11 @@ public abstract class Filter extends ConfigBlock { } } + @Override + public Object clone() throws CloneNotSupportedException { + return super.clone(); + } + @SuppressWarnings("unchecked") private void initializePostMapValues() { Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values"); http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 49d7e76..e0ae4f0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -62,6 +62,7 @@ public class FilterGrok extends Filter { private String sourceField = null; private boolean removeSourceField = true; + private boolean skipOnError = false; private Set<String> namedParamList = new HashSet<String>(); private Set<String> multiLineamedParamList = new HashSet<String>(); @@ -80,6 +81,8 @@ public class FilterGrok extends Filter { sourceField = getStringValue("source_field"); removeSourceField = getBooleanValue("remove_source_field", removeSourceField); + skipOnError = getBooleanValue("skip_on_error", false); + LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " + getShortDescription()); @@ -181,7 +184,7 @@ public class FilterGrok extends Filter { if (grokMultiline != null) { String jsonStr = grokMultiline.capture(inputStr); - if (!"{}".equals(jsonStr)) { + if (!"{}".equals(jsonStr) || skipOnError) { if (strBuff != null) { Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>()); try { @@ -226,7 +229,7 @@ public class FilterGrok extends Filter { String jsonStr = grokMessage.capture(inputStr); boolean parseError = false; - if ("{}".equals(jsonStr)) { + if ("{}".equals(jsonStr) && !skipOnError) { parseError = true; logParseError(inputStr); http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 7df0b6e..3808605 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -137,6 +137,10 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable { } } + public void setFirstFilter(Filter firstFilter) { + this.firstFilter = firstFilter; + } + public void addOutput(Output output) { outputList.add(output); } @@ -200,6 +204,7 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable { clonedObject.setLogFileDetacherThread(null); clonedObject.setLogFilePathUpdaterThread(null); clonedObject.setInputChildMap(new HashMap<String, InputFile>()); + copyFilters(clonedObject, firstFilter); Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard); clonedObject.setThread(thread); inputChildMap.put(fullPathWithWildCard, clonedObject); @@ -207,6 +212,34 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable { } } + private void copyFilters(InputFile clonedInput, Filter firstFilter) { + if (firstFilter != null) { + try { + LOG.info("Cloning filters for input=" + clonedInput.logPath); + Filter newFilter = (Filter) firstFilter.clone(); + newFilter.setInput(clonedInput); + clonedInput.setFirstFilter(newFilter); + Filter actFilter = firstFilter; + Filter actClonedFilter = newFilter; + while (actFilter != null) { + if (actFilter.getNextFilter() != null) { + actFilter = actFilter.getNextFilter(); + Filter newClonedFilter = (Filter) actFilter.clone(); + newClonedFilter.setInput(clonedInput); + actClonedFilter.setNextFilter(newClonedFilter); + actClonedFilter = newClonedFilter; + } else { + actClonedFilter.setNextFilter(null); + actFilter = null; + } + } + LOG.info("Cloning filters has finished for input=" + clonedInput.logPath); + } catch (Exception e) { + LOG.error("Could not clone filters for input=" + clonedInput.logPath); + } + } + } + public void stopChildInputFileThread(String folderPathKey) { LOG.info("Stop child input thread - " + folderPathKey); String filePath = new File(getFilePath()).getName(); http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json index 34bdcf0..68e6fcf 100644 --- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json +++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json @@ -20,7 +20,7 @@ }, "log4j_format":"", "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})", - "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}\\s%{JAVACLASS:logger_name}\\s%{DATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}", + "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{JAVACLASS:logger_name}\\s%{GREEDYDATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}", "post_map_values":{ "logtime":{ "map_date":{ @@ -41,7 +41,7 @@ }, "source_field": "thread_name", "remove_source_field": "false", - "message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:storm_component_name}\\-executor%{DATA}|%{DATA})" + "message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:sdi_storm_component_name}\\-executor%{DATA}|%{DATA:thread_name})" }, { "filter":"grok", @@ -55,7 +55,7 @@ }, "source_field": "path", "remove_source_field": "false", - "message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:storm_topology_id}/%{DATA:storm_worker_port}/worker\\.log" + "message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:sdi_storm_topology_id}/%{DATA:sdi_storm_worker_port}/worker\\.log" }, { "filter":"grok", @@ -67,9 +67,9 @@ ] } }, - "source_field": "storm_topology_id", + "source_field": "sdi_storm_topology_id", "remove_source_field": "false", - "message_pattern":"(streamline\\-%{DATA:streamline_topology_id}\\-%{DATA:streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{GREEDYDATA})" + "message_pattern":"(streamline\\-%{DATA:sdi_streamline_topology_id}\\-%{DATA:sdi_streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{DATA:sdi_storm_topology_id})" } ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log index 6a10ad9..787b9ee 100644 --- a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log +++ b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log @@ -1,4 +1,8 @@ -2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Preparing bolt __acker:(5) -2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Prepared bolt __acker:(5) +2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Preparing bolt __acker:(4) +2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] [WARN] Prepared bolt __acker:(5) 2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing transport -2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 3-NOTIFICATION:[3 3] \ No newline at end of file +2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 3-NOTIFICATION:[3 1] +2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[5 5] [WARN] Prepared bolt __acker:(6) + + + http://git-wip-us.apache.org/repos/asf/ambari/blob/b68b2ea5/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log new file mode 100644 index 0000000..248d6a4 --- /dev/null +++ b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-2-TestAgg2-4-5/6700/worker.log @@ -0,0 +1,9 @@ +2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[7 8] [INFO] Preparing bolt __acker:(1) +2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[7 8] [WARN] Prepared bolt __acker:(2) +2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier Thread-7-8-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing transport +2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 3-NOTIFICATION:[9 1] +2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[7 8] [WARN] Prepared bolt __acker:(3) + + + +
