This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c4209e AMBARI-22440. Log Feeder: wildcard support & checkpoint
cleanup on trunk
3c4209e is described below
commit 3c4209ed6f7dc073c0e06c6587ca506a97bac4cb
Author: Oliver Szabo <[email protected]>
AuthorDate: Mon Apr 2 20:34:39 2018 +0200
AMBARI-22440. Log Feeder: wildcard support & checkpoint cleanup on trunk
---
.../model/inputconfig/FilterGrokDescriptor.java | 4 +
.../api/model/inputconfig/InputDescriptor.java | 2 +
.../api/model/inputconfig/InputFileDescriptor.java | 7 +
.../inputconfig/impl/FilterGrokDescriptorImpl.java | 38 ++
.../inputconfig/impl/InputDescriptorImpl.java | 21 +
.../inputconfig/impl/InputFileDescriptorImpl.java | 83 +++
.../ambari/logfeeder/plugin/common/ConfigItem.java | 3 +-
.../plugin/common/LogFeederProperties.java | 3 +-
.../ambari/logfeeder/plugin/common/MetricData.java | 4 +-
.../ambari/logfeeder/plugin/filter/Filter.java | 4 +
.../ambari/logfeeder/plugin/input/Input.java | 15 +-
.../logfeeder/plugin/input/cache/LRUCache.java | 3 +-
.../ambari-logsearch-logfeeder/pom.xml | 5 +
.../apache/ambari/logfeeder/filter/FilterGrok.java | 25 +-
.../apache/ambari/logfeeder/input/InputFile.java | 220 ++++++-
.../ambari/logfeeder/input/InputManagerImpl.java | 12 +
.../logfeeder/input/file/FileCheckInHelper.java | 4 +-
.../input/monitor/AbstractLogFileMonitor.java | 64 ++
.../input/monitor/CheckpointCleanupMonitor.java | 48 ++
.../input/monitor/LogFileDetachMonitor.java | 79 +++
.../input/monitor/LogFilePathUpdateMonitor.java | 74 +++
.../ambari/logfeeder/output/OutputManagerImpl.java | 21 +-
.../org/apache/ambari/logfeeder/util/FileUtil.java | 137 ++++-
.../ambari/logfeeder/util/LogFeederUtil.java | 24 +
.../logsearch/model/common/LSServerFilterGrok.java | 24 +
.../logsearch/model/common/LSServerInput.java | 8 +
.../logsearch/model/common/LSServerInputFile.java | 52 ++
.../shipper-conf/input.config-ambari.json | 641 +++++++--------------
.../logfeeder/shipper-conf/input.config-storm.json | 75 +++
.../streamline-1-TestAgg-2-3/6700/worker.log | 5 +
.../streamline-1-TestAgg-2-3/6701/worker.log | 5 +
31 files changed, 1209 insertions(+), 501 deletions(-)
diff --git
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
index 039e1ff..9fc8eb4 100644
---
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
+++
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
@@ -27,4 +27,8 @@ public interface FilterGrokDescriptor extends
FilterDescriptor {
String getMessagePattern();
void setMultilinePattern(String multilinePattern);
+
+ boolean isSkipOnError();
+
+ boolean isDeepExtract();
}
diff --git
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
index d3a43c1..71f3cd1 100644
---
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
@@ -51,4 +51,6 @@ public interface InputDescriptor {
Boolean isEnabled();
String getGroup();
+
+ Boolean isInitDefaultFields();
}
diff --git
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
index 0070ad9..b58db6a 100644
---
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
@@ -20,4 +20,11 @@
package org.apache.ambari.logsearch.config.api.model.inputconfig;
public interface InputFileDescriptor extends InputFileBaseDescriptor {
+ Integer getDetachIntervalMin();
+
+ Integer getDetachTimeMin();
+
+ Integer getPathUpdateIntervalMin();
+
+ Integer getMaxAgeMin();
}
diff --git
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
index e140df0..9823163 100644
---
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
+++
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
@@ -61,6 +61,26 @@ public class FilterGrokDescriptorImpl extends
FilterDescriptorImpl implements Fi
@SerializedName("message_pattern")
private String messagePattern;
+ @ShipperConfigElementDescription(
+ path = "/filter/[]/skip_on_error",
+ type = "boolean",
+ description = "Skip filter if an error occurred during applying the grok
filter.",
+ examples = {"true"}
+ )
+ @Expose
+ @SerializedName("skip_on_error")
+ private boolean skipOnError;
+
+ @ShipperConfigElementDescription(
+ path = "/filter/[]/deep_extract",
+ type = "boolean",
+ description = "",
+ examples = {""}
+ )
+ @Expose
+ @SerializedName("deep_extract")
+ private boolean deepExtract;
+
@Override
public String getLog4jFormat() {
return log4jFormat;
@@ -81,6 +101,24 @@ public class FilterGrokDescriptorImpl extends
FilterDescriptorImpl implements Fi
}
@Override
+ public boolean isSkipOnError() {
+ return this.skipOnError;
+ }
+
+ public void setSkipOnError(boolean skipOnError) {
+ this.skipOnError = skipOnError;
+ }
+
+ @Override
+ public boolean isDeepExtract() {
+ return deepExtract;
+ }
+
+ public void setDeepExtract(boolean deepExtract) {
+ this.deepExtract = deepExtract;
+ }
+
+ @Override
public String getMessagePattern() {
return messagePattern;
}
diff --git
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
index 0994b9f..40886be 100644
---
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
+++
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
@@ -188,6 +188,18 @@ public abstract class InputDescriptorImpl implements
InputDescriptor {
@SerializedName("is_enabled")
private Boolean isEnabled;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/init_default_fields",
+ type = "boolean",
+ description = "Init default fields (ip, path etc.) before applying the
filter.",
+ examples = {"true", "false"},
+ defaultValue = "false"
+ )
+ @Expose
+ @SerializedName("init_default_fields")
+ private Boolean initDefaultFields;
+
public String getType() {
return type;
}
@@ -308,4 +320,13 @@ public abstract class InputDescriptorImpl implements
InputDescriptor {
public void setGroup(String group) {
this.group = group;
}
+
+ @Override
+ public Boolean isInitDefaultFields() {
+ return this.initDefaultFields;
+ }
+
+ public void setInitDefaultFields(Boolean initDefaultFields) {
+ this.initDefaultFields = initDefaultFields;
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
index 3bfd161..99b42fe 100644
---
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
+++
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -19,7 +19,90 @@
package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.logsearch.config.api.ShipperConfigElementDescription;
import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl
implements InputFileDescriptor {
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/detach_interval_min",
+ type = "integer",
+ description = "The period in minutes for checking which files are too old
(default: 300)",
+ examples = {"60"},
+ defaultValue = "1800"
+ )
+ @Expose
+ @SerializedName("detach_interval_min")
+ private Integer detachIntervalMin;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/detach_time_min",
+ type = "integer",
+ description = "The period in minutes when the application flags a file is
too old (default: 2000)",
+ examples = {"60"},
+ defaultValue = "2000"
+ )
+ @Expose
+ @SerializedName("detach_time_min")
+ private Integer detachTimeMin;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/path_update_interval_min",
+ type = "integer",
+ description = "The period in minutes for checking new files (default: 5,
based on detach values, its possible that a new input wont be monitored)",
+ examples = {"5"},
+ defaultValue = "5"
+ )
+ @Expose
+ @SerializedName("path_update_interval_min")
+ private Integer pathUpdateIntervalMin;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/max_age_min",
+ type = "integer",
+ description = "If the file has not modified for long (this time value in
minutes), then the checkpoint file can be deleted.",
+ examples = {"2000"},
+ defaultValue = "0"
+ )
+ @Expose
+ @SerializedName("max_age_min")
+ private Integer maxAgeMin;
+
+ @Override
+ public Integer getDetachIntervalMin() {
+ return this.detachIntervalMin;
+ }
+
+ @Override
+ public Integer getDetachTimeMin() {
+ return this.detachTimeMin;
+ }
+
+ @Override
+ public Integer getPathUpdateIntervalMin() {
+ return this.pathUpdateIntervalMin;
+ }
+
+ @Override
+ public Integer getMaxAgeMin() {
+ return this.maxAgeMin;
+ }
+
+ public void setDetachIntervalMin(Integer detachIntervalMin) {
+ this.detachIntervalMin = detachIntervalMin;
+ }
+
+ public void setDetachTimeMin(Integer detachTimeMin) {
+ this.detachTimeMin = detachTimeMin;
+ }
+
+ public void setPathUpdateIntervalMin(Integer pathUpdateIntervalMin) {
+ this.pathUpdateIntervalMin = pathUpdateIntervalMin;
+ }
+
+ public void setMaxAgeMin(Integer maxAgeMin) {
+ this.maxAgeMin = maxAgeMin;
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
index f9d4a7a..1cbbfd5 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
@@ -24,12 +24,13 @@ import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties>
implements Cloneable {
+public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties>
implements Cloneable, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ConfigItem.class);
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
index a1e5c12..7fac01a 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
@@ -18,12 +18,13 @@
*/
package org.apache.ambari.logfeeder.plugin.common;
+import java.io.Serializable;
import java.util.Properties;
/**
* Static application level configuration interface for Log Feeder
*/
-public interface LogFeederProperties {
+public interface LogFeederProperties extends Serializable {
/**
* Get all key-value pairs from static application level Log Feeder
configuration
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
index 933f131..54cdb7e 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
@@ -18,7 +18,9 @@
*/
package org.apache.ambari.logfeeder.plugin.common;
-public class MetricData {
+import java.io.Serializable;
+
+public class MetricData implements Serializable {
public final String metricsName;
public final boolean isPointInTime;
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
index e2180d7..f098245 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
@@ -184,4 +184,8 @@ public abstract class Filter<PROP_TYPE extends
LogFeederProperties> extends Conf
// no metrics yet
return null;
}
+
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 368f7d5..a586510 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -56,6 +56,7 @@ public abstract class Input<PROP_TYPE extends
LogFeederProperties, INPUT_MARKER
private Thread thread;
private LRUCache cache;
private String cacheKeyField;
+ private boolean initDefaultFields;
protected MetricData readBytesMetric = new
MetricData(getReadBytesMetricName(), false);
public void loadConfigs(InputDescriptor inputDescriptor, PROP_TYPE
logFeederProperties,
@@ -76,8 +77,6 @@ public abstract class Input<PROP_TYPE extends
LogFeederProperties, INPUT_MARKER
public abstract boolean monitor();
- public abstract List<? extends Input> getChildInputs();
-
public abstract INPUT_MARKER getInputMarker();
public abstract boolean isReady();
@@ -337,4 +336,16 @@ public abstract class Input<PROP_TYPE extends
LogFeederProperties, INPUT_MARKER
public String toString() {
return getShortDescription();
}
+
+ public void setFirstFilter(Filter<PROP_TYPE> firstFilter) {
+ this.firstFilter = firstFilter;
+ }
+
+ public boolean isInitDefaultFields() {
+ return initDefaultFields;
+ }
+
+ public void setInitDefaultFields(boolean initDefaultFields) {
+ this.initDefaultFields = initDefaultFields;
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
index e0509fe..9ee8743 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
@@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.plugin.input.cache;
import com.google.common.collect.EvictingQueue;
+import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -28,7 +29,7 @@ import java.util.Map;
* It won't put already existing entries into the cache map if de-duplication
interval not higher then a specific value
* or if the new value is the most recently used one (in case of
lastDedupEnabled is true)
*/
-public class LRUCache {
+public class LRUCache implements Serializable {
private final LinkedHashMap<String, Long> keyValueMap;
private final String fileName;
private final long dedupInterval;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 884f49f..688c944 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -215,6 +215,11 @@
<artifactId>netty-all</artifactId>
<version>4.0.37.Final</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ <version>1.7.1</version>
+ </dependency>
<!-- Exclude jars globally-->
<dependency>
<groupId>commons-beanutils</groupId>
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 21b1ea4..2074f93 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -71,6 +71,8 @@ public class FilterGrok extends Filter<LogFeederProps> {
private MetricData grokErrorMetric = new MetricData("filter.error.grok",
false);
+ private boolean skipOnError = false;
+
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
super.init(logFeederProps);
@@ -80,6 +82,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
multilinePattern =
escapePattern(((FilterGrokDescriptor)getFilterDescriptor()).getMultilinePattern());
sourceField = getFilterDescriptor().getSourceField();
removeSourceField =
BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(),
removeSourceField);
+ skipOnError = ((FilterGrokDescriptor)
getFilterDescriptor()).isSkipOnError();
LOG.info("init() done. grokPattern=" + messagePattern + ",
multilinePattern=" + multilinePattern + ", " +
getShortDescription());
@@ -92,6 +95,11 @@ public class FilterGrok extends Filter<LogFeederProps> {
grokMessage = new Grok();
loadPatterns(grokMessage);
grokMessage.compile(messagePattern);
+ if (((FilterGrokDescriptor)getFilterDescriptor()).isDeepExtract()) {
+ extractNamedParams(grokMessage.getNamedRegexCollection());
+ } else {
+ extractNamedParams(messagePattern, namedParamList);
+ }
if (!StringUtils.isEmpty(multilinePattern)) {
extractNamedParams(multilinePattern, multiLineamedParamList);
@@ -134,6 +142,16 @@ public class FilterGrok extends Filter<LogFeederProps> {
}
}
+ private void extractNamedParams(Map<String, String> namedRegexCollection) {
+ if (namedRegexCollection != null) {
+ for (String paramValue : namedRegexCollection.values()) {
+ if (paramValue.toLowerCase().equals(paramValue)) {
+ namedParamList.add(paramValue);
+ }
+ }
+ }
+ }
+
private boolean loadPatterns(Grok grok) {
InputStreamReader grokPatternsReader = null;
LOG.info("Loading pattern file " + GROK_PATTERN_FILE);
@@ -166,10 +184,11 @@ public class FilterGrok extends Filter<LogFeederProps> {
if (grokMultiline != null) {
String jsonStr = grokMultiline.capture(inputStr);
- if (!"{}".equals(jsonStr)) {
+ if (!"{}".equals(jsonStr) || skipOnError) {
if (strBuff != null) {
Map<String, Object> jsonObj = Collections.synchronizedMap(new
HashMap<String, Object>());
try {
+ LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker,
false);
applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
} finally {
strBuff = null;
@@ -189,6 +208,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
} else {
savedInputMarker = inputMarker;
Map<String, Object> jsonObj = Collections.synchronizedMap(new
HashMap<String, Object>());
+ LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, false);
applyMessage(inputStr, jsonObj, null);
}
}
@@ -197,6 +217,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
public void apply(Map<String, Object> jsonObj, InputMarker inputMarker)
throws Exception {
if (sourceField != null) {
savedInputMarker = inputMarker;
+ LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, false);
applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
if (removeSourceField) {
jsonObj.remove(sourceField);
@@ -208,7 +229,7 @@ public class FilterGrok extends Filter<LogFeederProps> {
String jsonStr = grokMessage.capture(inputStr);
boolean parseError = false;
- if ("{}".equals(jsonStr)) {
+ if ("{}".equals(jsonStr) && !skipOnError) {
parseError = true;
logParseError(inputStr);
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 8b5310f..726a237 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -20,15 +20,18 @@ package org.apache.ambari.logfeeder.input;
import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
+import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
import org.apache.ambari.logfeeder.input.file.FileCheckInHelper;
import org.apache.ambari.logfeeder.input.file.ProcessFileHelper;
import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.ArrayUtils;
@@ -39,12 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileFilter;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class InputFile extends Input<LogFeederProps, InputFileMarker> {
@@ -55,6 +53,10 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
+ private static final int DEFAULT_DETACH_INTERVAL_MIN = 300;
+ private static final int DEFAULT_DETACH_TIME_MIN = 2000;
+ private static final int DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN = 5;
+
private boolean isReady;
private boolean tail;
@@ -66,6 +68,10 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
private String base64FileKey;
private String checkPointExtension;
private int checkPointIntervalMS;
+ private int detachIntervalMin;
+ private int detachTimeMin;
+ private int pathUpdateIntervalMin;
+ private Integer maxAgeMin;
private Map<String, File> checkPointFiles = new HashMap<>();
private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
@@ -73,12 +79,21 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
private Map<String, InputFileMarker> lastCheckPointInputMarkers = new
HashMap<>();
private Thread thread;
+ private Thread logFileDetacherThread;
+ private Thread logFilePathUpdaterThread;
+ private ThreadGroup threadGroup;
+
+ private boolean multiFolder = false;
+ private Map<String, List<File>> folderMap;
+ private Map<String, InputFile> inputChildMap = new HashMap<>();
@Override
public boolean isReady() {
if (!isReady) {
// Let's try to check whether the file is available
- logFiles = getActualFiles(logPath);
+ logFiles = getActualInputLogFiles();
+ Map<String, List<File>> foldersMap =
FileUtil.getFoldersForFiles(logFiles);
+ setFolderMap(foldersMap);
if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
if (tail && logFiles.length > 1) {
LOG.warn("Found multiple files (" + logFiles.length + ") for the
file filter " + filePath +
@@ -132,26 +147,32 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
return "input.files.read_bytes";
}
- private File[] getActualFiles(String searchPath) {
- File searchFile = new File(searchPath);
- if (!searchFile.getParentFile().exists()) {
- return new File[0];
- } else if (searchFile.isFile()) {
- return new File[]{searchFile};
- } else {
- FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
- File[] logFiles = searchFile.getParentFile().listFiles(fileFilter);
- Arrays.sort(logFiles, Comparator.comparing(File::getName));
- return logFiles;
- }
- }
-
@Override
public boolean monitor() {
if (isReady()) {
- LOG.info("Starting thread. " + getShortDescription());
- thread = new Thread(this, getNameForThread());
- thread.start();
+ if (multiFolder) {
+ try {
+ threadGroup = new ThreadGroup(getNameForThread());
+ if (getFolderMap() != null) {
+ for (Map.Entry<String, List<File>> folderFileEntry :
getFolderMap().entrySet()) {
+ startNewChildInputFileThread(folderFileEntry);
+ }
+ logFilePathUpdaterThread = new Thread(new
LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin,
detachTimeMin), "logfile_path_updater=" + filePath);
+ logFilePathUpdaterThread.setDaemon(true);
+ logFileDetacherThread = new Thread(new
LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin),
"logfile_detacher=" + filePath);
+ logFileDetacherThread.setDaemon(true);
+
+ logFilePathUpdaterThread.start();
+ logFileDetacherThread.start();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.info("Starting thread. " + getShortDescription());
+ thread = new Thread(this, getNameForThread());
+ thread.start();
+ }
return true;
} else {
return false;
@@ -159,11 +180,6 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
}
@Override
- public List<InputFile> getChildInputs() {
- return null;
- }
-
- @Override
public InputFileMarker getInputMarker() {
return null;
}
@@ -179,15 +195,29 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
setClosed(true);
logPath = getInputDescriptor().getPath();
checkPointIntervalMS = (int)
ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(),
DEFAULT_CHECKPOINT_INTERVAL_MS);
-
+ detachIntervalMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(),
DEFAULT_DETACH_INTERVAL_MIN * 60);
+ detachTimeMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(),
DEFAULT_DETACH_TIME_MIN * 60);
+ pathUpdateIntervalMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getPathUpdateIntervalMin(),
DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60);
+ maxAgeMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(),
0);
+ boolean initDefaultFields =
BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(),
false);
+ setInitDefaultFields(initDefaultFields);
if (StringUtils.isEmpty(logPath)) {
LOG.error("path is empty for file input. " + getShortDescription());
return;
}
setFilePath(logPath);
+ // Check there can have pattern in folder
+ if (getFilePath() != null && getFilePath().contains("/")) {
+ int lastIndexOfSlash = getFilePath().lastIndexOf("/");
+ String folderBeforeLogName = getFilePath().substring(0,
lastIndexOfSlash);
+ if (folderBeforeLogName.contains("*")) {
+ LOG.info("Found regex in folder path ('" + getFilePath() + "'), will
check against multiple folders.");
+ setMultiFolder(true);
+ }
+ }
boolean isFileReady = isReady();
- LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" +
isReady());
+ LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" +
isFileReady);
LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig();
initCache(
@@ -265,6 +295,73 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
}
}
+ public void startNewChildInputFileThread(Map.Entry<String, List<File>>
folderFileEntry) throws CloneNotSupportedException {
+ LOG.info("Start child input thread - " + folderFileEntry.getKey());
+ InputFile clonedObject = (InputFile) this.clone();
+ String folderPath = folderFileEntry.getKey();
+ String filePath = new File(getFilePath()).getName();
+ String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath);
+ if (clonedObject.getMaxAgeMin() != 0 && FileUtil.isFileTooOld(new
File(fullPathWithWildCard), clonedObject.getMaxAgeMin().longValue())) {
+ LOG.info(String.format("File ('%s') is too old (max age min: %d),
monitor thread not starting...", getFilePath(), clonedObject.getMaxAgeMin()));
+ } else {
+ clonedObject.setMultiFolder(false);
+ clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]);
// TODO: works only with tail
+ clonedObject.logPath = fullPathWithWildCard;
+ clonedObject.setLogFileDetacherThread(null);
+ clonedObject.setLogFilePathUpdaterThread(null);
+ clonedObject.setInputChildMap(new HashMap<>());
+ copyFilters(clonedObject, getFirstFilter());
+ Thread thread = new Thread(threadGroup, clonedObject, "file=" +
fullPathWithWildCard);
+ clonedObject.setThread(thread);
+ inputChildMap.put(fullPathWithWildCard, clonedObject);
+ thread.start();
+ }
+ }
+
+ private void copyFilters(InputFile clonedInput, Filter firstFilter) {
+ if (firstFilter != null) {
+ try {
+ LOG.info("Cloning filters for input=" + clonedInput.logPath);
+ Filter newFilter = (Filter) firstFilter.clone();
+ newFilter.setInput(clonedInput);
+ clonedInput.setFirstFilter(newFilter);
+ Filter actFilter = firstFilter;
+ Filter actClonedFilter = newFilter;
+ while (actFilter != null) {
+ if (actFilter.getNextFilter() != null) {
+ actFilter = actFilter.getNextFilter();
+ Filter newClonedFilter = (Filter) actFilter.clone();
+ newClonedFilter.setInput(clonedInput);
+ actClonedFilter.setNextFilter(newClonedFilter);
+ actClonedFilter = newClonedFilter;
+ } else {
+ actClonedFilter.setNextFilter(null);
+ actFilter = null;
+ }
+ }
+ LOG.info("Cloning filters has finished for input=" +
clonedInput.logPath);
+ } catch (Exception e) {
+ LOG.error("Could not clone filters for input=" + clonedInput.logPath);
+ }
+ }
+ }
+
+ public void stopChildInputFileThread(String folderPathKey) {
+ LOG.info("Stop child input thread - " + folderPathKey);
+ String filePath = new File(getFilePath()).getName();
+ String fullPathWithWildCard = String.format("%s/%s", folderPathKey,
filePath);
+ if (inputChildMap.containsKey(fullPathWithWildCard)) {
+ InputFile inputFile = inputChildMap.get(fullPathWithWildCard);
+ inputFile.setClosed(true);
+ if (inputFile.getThread() != null && inputFile.getThread().isAlive()) {
+ inputFile.getThread().interrupt();
+ }
+ inputChildMap.remove(fullPathWithWildCard);
+ } else {
+ LOG.warn(fullPathWithWildCard + " not found as an input child.");
+ }
+ }
+
@Override
public boolean isEnabled() {
return BooleanUtils.isNotFalse(getInputDescriptor().isEnabled());
@@ -291,6 +388,10 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
setClosed(true);
}
+ public File[] getActualInputLogFiles() {
+ return FileUtil.getInputFilesByPattern(logPath);
+ }
+
public String getFilePath() {
return filePath;
}
@@ -354,4 +455,59 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
public Map<String, InputFileMarker> getLastCheckPointInputMarkers() {
return lastCheckPointInputMarkers;
}
+
+ public boolean isMultiFolder() {
+ return multiFolder;
+ }
+
+ public void setMultiFolder(boolean multiFolder) {
+ this.multiFolder = multiFolder;
+ }
+
+ public Map<String, List<File>> getFolderMap() {
+ return folderMap;
+ }
+
+ public void setFolderMap(Map<String, List<File>> folderMap) {
+ this.folderMap = folderMap;
+ }
+
+ public Map<String, InputFile> getInputChildMap() {
+ return inputChildMap;
+ }
+
+ public void setInputChildMap(Map<String, InputFile> inputChildMap) {
+ this.inputChildMap = inputChildMap;
+ }
+
+ @Override
+ public Thread getThread() {
+ return thread;
+ }
+
+ @Override
+ public void setThread(Thread thread) {
+ this.thread = thread;
+ }
+
+ public Thread getLogFileDetacherThread() {
+ return logFileDetacherThread;
+ }
+
+ public void setLogFileDetacherThread(Thread logFileDetacherThread) {
+ this.logFileDetacherThread = logFileDetacherThread;
+ }
+
+ public Thread getLogFilePathUpdaterThread() {
+ return logFilePathUpdaterThread;
+ }
+
+ public void setLogFilePathUpdaterThread(Thread logFilePathUpdaterThread) {
+ this.logFilePathUpdaterThread = logFilePathUpdaterThread;
+ }
+
+ public Integer getMaxAgeMin() {
+ return maxAgeMin;
+ }
+
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index 70aa681..40475c6 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.input;
import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.manager.InputManager;
@@ -152,6 +153,10 @@ public class InputManagerImpl extends InputManager {
if (isCheckPointFolderValid) {
LOG.info("Using folder " + checkPointFolderFile + " for storing
checkpoints");
+ // check checkpoint cleanup every 2000 min
+ Thread checkpointCleanupThread = new Thread(new
CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
+ checkpointCleanupThread.setDaemon(true);
+ checkpointCleanupThread.start();
} else {
throw new IllegalStateException("Could not determine the checkpoint
folder.");
}
@@ -304,6 +309,10 @@ public class InputManagerImpl extends InputManager {
String logFilePath = (String) jsonCheckPoint.get("file_path");
String logFileKey = (String) jsonCheckPoint.get("file_key");
+ Integer maxAgeMin = null;
+ if (jsonCheckPoint.containsKey("max_age_min")) {
+ maxAgeMin =
Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
+ }
if (logFilePath != null && logFileKey != null) {
boolean deleteCheckPointFile = false;
File logFile = new File(logFilePath);
@@ -314,6 +323,9 @@ public class InputManagerImpl extends InputManager {
LOG.info("CheckPoint clean: File key has changed. old=" +
logFileKey + ", new=" + fileBase64 + ", filePath=" +
logFilePath + ", checkPointFile=" +
checkPointFile.getAbsolutePath());
deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(),
logFileKey);
+ } else if (maxAgeMin != null && maxAgeMin != 0 &&
FileUtil.isFileTooOld(logFile, maxAgeMin)) {
+ deleteCheckPointFile = true;
+ LOG.info("Checkpoint clean: File reached max age minutes (" +
maxAgeMin + "):" + logFilePath);
}
} else {
LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" +
logFilePath + ", checkPointFile=" +
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
index 9c607cf..7b8f0cd 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -58,6 +58,9 @@ public class FileCheckInHelper {
}
inputFile.getLastCheckPointTimeMSs().put(inputMarker.getBase64FileKey(),
currMS);
+ if (inputFile.getMaxAgeMin() != 0) {
+ jsonCheckPoint.put("max_age_min", inputFile.getMaxAgeMin().toString());
+ }
jsonCheckPoint.put("line_number", "" + new
Integer(inputMarker.getLineNumber()));
jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
jsonCheckPoint.put("last_write_time_date", new Date());
@@ -89,5 +92,4 @@ public class FileCheckInHelper {
}
-
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
new file mode 100644
index 0000000..e0acde1
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractLogFileMonitor implements Runnable {
+
+ private Logger LOG = LoggerFactory.getLogger(AbstractLogFileMonitor.class);
+
+ private final InputFile inputFile;
+ private final int waitInterval;
+ private final int detachTime;
+
+ AbstractLogFileMonitor(InputFile inputFile, int waitInterval, int
detachTime) {
+ this.inputFile = inputFile;
+ this.waitInterval = waitInterval;
+ this.detachTime = detachTime;
+ }
+
+ public InputFile getInputFile() {
+ return inputFile;
+ }
+
+ public int getDetachTime() {
+ return detachTime;
+ }
+
+ @Override
+ public void run() {
+ LOG.info(getStartLog());
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(1000 * waitInterval);
+ monitorAndUpdate();
+ } catch (Exception e) {
+ LOG.error("Monitor thread interrupted.", e);
+ }
+ }
+ }
+
+ protected abstract String getStartLog();
+
+ protected abstract void monitorAndUpdate() throws Exception;
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
new file mode 100644
index 0000000..65a314e
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CheckpointCleanupMonitor implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CheckpointCleanupMonitor.class);
+
+ private long waitIntervalMin;
+ private InputManager inputManager;
+
+ public CheckpointCleanupMonitor(InputManager inputManager, long
waitIntervalMin) {
+ this.waitIntervalMin = waitIntervalMin;
+ this.inputManager = inputManager;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(1000 * 60 * waitIntervalMin);
+ inputManager.cleanCheckPointFiles();
+ } catch (Exception e) {
+ LOG.error("Cleanup checkpoint files thread interrupted.", e);
+ }
+ }
+ }
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
new file mode 100644
index 0000000..a40e118
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Detach log files in case of folders do not exist or monitored files are too
old
+ */
+public class LogFileDetachMonitor extends AbstractLogFileMonitor {
+
+ private Logger LOG = LoggerFactory.getLogger(LogFileDetachMonitor.class);
+
+ public LogFileDetachMonitor(InputFile inputFile, int interval, int
detachTime) {
+ super(inputFile, interval, detachTime);
+ }
+
+ @Override
+ public String getStartLog() {
+ return "Start file detach monitor thread for " +
getInputFile().getFilePath();
+ }
+
+ @Override
+ protected void monitorAndUpdate() throws Exception {
+ File[] logFiles = getInputFile().getActualInputLogFiles();
+ Map<String, List<File>> actualFolderMap =
FileUtil.getFoldersForFiles(logFiles);
+
+ // create map copies
+ Map<String, InputFile> copiedInputFileMap = new
HashMap<>(getInputFile().getInputChildMap());
+ Map<String, List<File>> copiedFolderMap = new
HashMap<>(getInputFile().getFolderMap());
+ // detach old entries
+ for (Map.Entry<String, List<File>> entry : copiedFolderMap.entrySet()) {
+ if (new File(entry.getKey()).exists()) {
+ for (Map.Entry<String, InputFile> inputFileEntry :
copiedInputFileMap.entrySet()) {
+ if (inputFileEntry.getKey().startsWith(entry.getKey())) {
+ File monitoredFile = entry.getValue().get(0);
+ boolean isFileTooOld = FileUtil.isFileTooOld(monitoredFile,
getDetachTime());
+ if (isFileTooOld) {
+ LOG.info("File ('{}') in folder ('{}') is too old (reached {}
minutes), detach input thread.", entry.getKey(), getDetachTime());
+ getInputFile().stopChildInputFileThread(entry.getKey());
+ }
+ }
+ }
+ } else {
+ LOG.info("Folder not exists. ({}) Stop thread.", entry.getKey());
+ for (Map.Entry<String, InputFile> inputFileEntry :
copiedInputFileMap.entrySet()) {
+ if (inputFileEntry.getKey().startsWith(entry.getKey())) {
+ getInputFile().stopChildInputFileThread(entry.getKey());
+ getInputFile().setFolderMap(actualFolderMap);
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
new file mode 100644
index 0000000..bfcab5d
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Update log file paths periodically, useful if the log file name pattern
format is like: mylog-2017-10-09.log (so the tail one can change)
+ */
+public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor {
+
+ private Logger LOG = LoggerFactory.getLogger(LogFilePathUpdateMonitor.class);
+
+ public LogFilePathUpdateMonitor(InputFile inputFile, int interval, int
detachTime) {
+ super(inputFile, interval, detachTime);
+ }
+
+ @Override
+ public String getStartLog() {
+ return "Start file path update monitor thread for " +
getInputFile().getFilePath();
+ }
+
+ @Override
+ protected void monitorAndUpdate() throws Exception {
+ File[] logFiles = getInputFile().getActualInputLogFiles();
+ Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
+ Map<String, List<File>> originalFoldersMap = getInputFile().getFolderMap();
+ for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) {
+ if (originalFoldersMap.keySet().contains(entry.getKey())) {
+ List<File> originalLogFiles = originalFoldersMap.get(entry.getKey());
+ if (!entry.getValue().isEmpty()) { // check tail only for now
+ File lastFile = entry.getValue().get(0);
+ if
(!originalLogFiles.get(0).getAbsolutePath().equals(lastFile.getAbsolutePath()))
{
+ LOG.info("New file found (old: '{}', new: {}), reload thread for
{}",
+ lastFile.getAbsolutePath(),
originalLogFiles.get(0).getAbsolutePath(), entry.getKey());
+ getInputFile().stopChildInputFileThread(entry.getKey());
+ getInputFile().startNewChildInputFileThread(entry);
+ }
+ }
+ } else {
+ LOG.info("New log file folder found: {}, start a new thread if tail
file is not too old.", entry.getKey());
+ File monitoredFile = entry.getValue().get(0);
+ if (FileUtil.isFileTooOld(monitoredFile, getDetachTime())) {
+ LOG.info("'{}' file is too old. No new thread start needed.",
monitoredFile.getAbsolutePath());
+ } else {
+ getInputFile().startNewChildInputFileThread(entry);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index e7e0eef..834cf7e 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -48,7 +48,7 @@ public class OutputManagerImpl extends OutputManager {
private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
- private List<Output> outputs = new ArrayList<Output>();
+ private List<Output> outputs = new ArrayList<>();
private boolean addMessageMD5 = true;
@@ -100,24 +100,7 @@ public class OutputManagerImpl extends OutputManager {
// TODO: Ideally most of the overrides should be configurable
- if (jsonObj.get("type") == null) {
- jsonObj.put("type", input.getInputDescriptor().getType());
- }
- if (input.getClass().isAssignableFrom(InputFile.class)) { // TODO: find
better solution
- InputFile inputFile = (InputFile) input;
- if (jsonObj.get("path") == null && inputFile.getFilePath() != null) {
- jsonObj.put("path", inputFile.getFilePath());
- }
- }
- if (jsonObj.get("path") == null && input.getInputDescriptor().getPath() !=
null) {
- jsonObj.put("path", input.getInputDescriptor().getPath());
- }
- if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
- jsonObj.put("host", LogFeederUtil.hostName);
- }
- if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
- jsonObj.put("ip", LogFeederUtil.ipAddress);
- }
+ LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
if (input.isUseEventMD5() || input.isGenEventMD5()) {
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index 8ade992..cc6b1e0 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,25 +21,45 @@ package org.apache.ambari.logfeeder.util;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-import org.apache.log4j.Logger;
+import org.apache.commons.io.FileUtils;
+import org.apache.tools.ant.DirectoryScanner;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FileUtil {
- private static final Logger LOG = Logger.getLogger(FileUtil.class);
-
+ private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class);
+ private static final String FOLDER_SEPARATOR = "/";
+
private FileUtil() {
throw new UnsupportedOperationException();
}
+ public static List<File> getAllFileFromDir(File directory, String extension,
boolean checkInSubDir) {
+ if (!directory.exists()) {
+ LOG.error(directory.getAbsolutePath() + " is not exists ");
+ } else if (!directory.isDirectory()) {
+ LOG.error(directory.getAbsolutePath() + " is not Directory ");
+ } else {
+ return (List<File>) FileUtils.listFiles(directory, new
String[]{extension}, checkInSubDir);
+ }
+ return new ArrayList<File>();
+ }
+
+
public static Object getFileKey(File file) {
try {
Path fileFullPath = Paths.get(file.getAbsolutePath());
@@ -52,20 +72,111 @@ public class FileUtil {
}
return file.toString();
}
-
- public static HashMap<String, Object> getJsonFileContentFromClassPath(String
fileName) {
+
+ public static File getFileFromClasspath(String filename) {
+ URL fileCompleteUrl =
Thread.currentThread().getContextClassLoader().getResource(filename);
+ LOG.debug("File Complete URI :" + fileCompleteUrl);
+ File file = null;
+ try {
+ file = new File(fileCompleteUrl.toURI());
+ } catch (Exception exception) {
+ LOG.debug(exception.getMessage(), exception.getCause());
+ }
+ return file;
+ }
+
+ public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
ObjectMapper mapper = new ObjectMapper();
- try (InputStream inputStream =
FileUtil.class.getClassLoader().getResourceAsStream(fileName)) {
- return mapper.readValue(inputStream, new TypeReference<HashMap<String,
Object>>() {});
+ try {
+ HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new
TypeReference<HashMap<String, Object>>() {});
+ return jsonmap;
} catch (IOException e) {
- LOG.error(e, e.getCause());
+ LOG.error("{}", e);
}
return new HashMap<String, Object>();
}
-
+
+ public static File[] getInputFilesByPattern(String searchPath) {
+ File searchFile = new File(searchPath);
+ if (searchFile.isFile()) {
+ return new File[]{searchFile};
+ } else {
+ if (searchPath.contains("*")) {
+ try {
+ String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
+ String fileNameAfterLastFolder =
searchPath.substring(folderBeforeRegex.length());
+
+ DirectoryScanner scanner = new DirectoryScanner();
+ scanner.setIncludes(new String[]{fileNameAfterLastFolder});
+ scanner.setBasedir(folderBeforeRegex);
+ scanner.setCaseSensitive(true);
+ scanner.scan();
+ String[] fileNames = scanner.getIncludedFiles();
+
+ if (fileNames != null && fileNames.length > 0) {
+ File[] files = new File[fileNames.length];
+ for (int i = 0; i < fileNames.length; i++) {
+ files[i] = new File(folderBeforeRegex + fileNames[i]);
+ }
+ return files;
+ }
+ } catch (Exception e) {
+ LOG.warn("Input file not found by pattern (exception thrown); {},
message: {}", searchPath, e.getMessage());
+ }
+
+ } else {
+ LOG.warn("Input file config not found by pattern; {}", searchPath);
+ }
+ return new File[]{};
+ }
+ }
+
+ public static Map<String, List<File>> getFoldersForFiles(File[] inputFiles) {
+ Map<String, List<File>> foldersMap = new HashMap<>();
+ if (inputFiles != null && inputFiles.length > 0) {
+ for (File inputFile : inputFiles) {
+ File folder = inputFile.getParentFile();
+ if (folder.exists()) {
+ if (foldersMap.containsKey(folder.getAbsolutePath())) {
+ foldersMap.get(folder.getAbsolutePath()).add(inputFile);
+ } else {
+ List<File> fileList = new ArrayList<>();
+ fileList.add(inputFile);
+ foldersMap.put(folder.getAbsolutePath(), fileList);
+ }
+ }
+ }
+ }
+ if (!foldersMap.isEmpty()) {
+ for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) {
+ Collections.sort(entry.getValue(), Collections.reverseOrder());
+ }
+ }
+ return foldersMap;
+ }
+
+ private static String getLogDirNameBeforeWildCard(String pattern) {
+ String[] splitByFirstRegex = pattern.split("\\*");
+ String beforeRegex = splitByFirstRegex[0];
+ if (beforeRegex.contains(FOLDER_SEPARATOR)) {
+ int endIndex = beforeRegex.lastIndexOf(FOLDER_SEPARATOR);
+ String parentFolder = beforeRegex;
+ if (endIndex != -1) {
+ parentFolder = beforeRegex.substring(0, endIndex) + FOLDER_SEPARATOR;
+ }
+ return parentFolder;
+ } else {
+ return beforeRegex;
+ }
+ }
+
public static void move(File source, File target) throws IOException {
Path sourcePath = Paths.get(source.getAbsolutePath());
Path targetPath = Paths.get(target.getAbsolutePath());
Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
}
-}
+
+ public static boolean isFileTooOld(File file, long diffMin) {
+ return (System.currentTimeMillis() - file.lastModified()) > diffMin * 1000
* 60;
+ }
+}
\ No newline at end of file
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 6fd00a4..9b0b0e8 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -21,7 +21,9 @@ package org.apache.ambari.logfeeder.util;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
+import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -137,4 +139,26 @@ public class LogFeederUtil {
return false;
}
}
+
+ public static void fillMapWithFieldDefaults(Map<String, Object> jsonObj,
InputMarker inputMarker, boolean force) {
+ if (inputMarker != null && inputMarker.getInput() != null && (force ||
inputMarker.getInput().isInitDefaultFields())) {
+ if (jsonObj.get("type") == null) {
+ jsonObj.put("type",
inputMarker.getInput().getInputDescriptor().getType());
+ }
+ if (inputMarker.getInput() instanceof InputFile) {
+ if (jsonObj.get("path") == null &&
((InputFile)inputMarker.getInput()).getFilePath() != null) {
+ jsonObj.put("path",
((InputFile)inputMarker.getInput()).getFilePath());
+ }
+ }
+ if (jsonObj.get("path") == null &&
inputMarker.getInput().getInputDescriptor().getPath() != null) {
+ jsonObj.put("path",
inputMarker.getInput().getInputDescriptor().getPath());
+ }
+ if (jsonObj.get("host") == null && hostName != null) {
+ jsonObj.put("host", hostName);
+ }
+ if (jsonObj.get("ip") == null && ipAddress != null) {
+ jsonObj.put("ip", ipAddress);
+ }
+ }
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
index 6de9f4c..215c572 100644
---
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
+++
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
@@ -41,6 +41,12 @@ public class LSServerFilterGrok extends LSServerFilter {
@JsonProperty("message_pattern")
private String messagePattern;
+ @JsonProperty
+ private boolean skipOnError;
+
+ @JsonProperty
+ private boolean deepExtract;
+
public LSServerFilterGrok() {}
public LSServerFilterGrok(FilterDescriptor filterDescriptor) {
@@ -50,6 +56,8 @@ public class LSServerFilterGrok extends LSServerFilter {
this.log4jFormat = filterGrokDescriptor.getLog4jFormat();
this.multilinePattern = filterGrokDescriptor.getMultilinePattern();
this.messagePattern = filterGrokDescriptor.getMessagePattern();
+ this.skipOnError = filterGrokDescriptor.isSkipOnError();
+ this.deepExtract = filterGrokDescriptor.isDeepExtract();
}
}
@@ -76,4 +84,20 @@ public class LSServerFilterGrok extends LSServerFilter {
public void setMessagePattern(String messagePattern) {
this.messagePattern = messagePattern;
}
+
+ public boolean isSkipOnError() {
+ return skipOnError;
+ }
+
+ public void setSkipOnError(boolean skipOnError) {
+ this.skipOnError = skipOnError;
+ }
+
+ public boolean isDeepExtract() {
+ return deepExtract;
+ }
+
+ public void setDeepExtract(boolean deepExtract) {
+ this.deepExtract = deepExtract;
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
index 6ef3d3f..af28f17 100644
---
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
+++
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
@@ -70,6 +70,9 @@ public abstract class LSServerInput {
@JsonProperty("is_enabled")
private Boolean isEnabled;
+
+ @JsonProperty("init_default_fields")
+ private Boolean initDefaultFields;
public LSServerInput() {}
@@ -88,6 +91,7 @@ public abstract class LSServerInput {
this.cacheSize = inputDescriptor.getCacheSize();
this.cacheDedupInterval = inputDescriptor.getCacheDedupInterval();
this.isEnabled = inputDescriptor.isEnabled();
+ this.initDefaultFields = inputDescriptor.isInitDefaultFields();
}
public String getType() {
@@ -145,4 +149,8 @@ public abstract class LSServerInput {
public Boolean getIsEnabled() {
return isEnabled;
}
+
+ public Boolean getInitDefaultFields() {
+ return initDefaultFields;
+ }
}
diff --git
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
index bb2a49c..efa56a2 100644
---
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
+++
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
@@ -19,15 +19,67 @@
package org.apache.ambari.logsearch.model.common;
+import com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
import io.swagger.annotations.ApiModel;
+import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
@ApiModel
public class LSServerInputFile extends LSServerInputFileBase {
+
+ @JsonProperty("detach_interval_min")
+ private Integer detachIntervalMin;
+
+ @JsonProperty("detach_time_min")
+ private Integer detachTimeMin;
+
+ @JsonProperty("path_update_interval_min")
+ private Integer pathUpdateIntervalMin;
+
+ @JsonProperty("max_age_min")
+ private Integer maxAgeMin;
+
public LSServerInputFile() {}
public LSServerInputFile(InputDescriptor inputDescriptor) {
super(inputDescriptor);
+ InputFileDescriptor inputFileDescriptor =
(InputFileDescriptor)inputDescriptor;
+ this.detachIntervalMin = inputFileDescriptor.getDetachIntervalMin();
+ this.detachTimeMin = inputFileDescriptor.getDetachTimeMin();
+ this.pathUpdateIntervalMin =
inputFileDescriptor.getPathUpdateIntervalMin();
+ this.maxAgeMin = inputFileDescriptor.getMaxAgeMin();
+ }
+
+ public Integer getDetachIntervalMin() {
+ return detachIntervalMin;
+ }
+
+ public void setDetachIntervalMin(Integer detachIntervalMin) {
+ this.detachIntervalMin = detachIntervalMin;
+ }
+
+ public Integer getDetachTimeMin() {
+ return detachTimeMin;
+ }
+
+ public void setDetachTimeMin(Integer detachTimeMin) {
+ this.detachTimeMin = detachTimeMin;
+ }
+
+ public Integer getPathUpdateIntervalMin() {
+ return pathUpdateIntervalMin;
+ }
+
+ public void setPathUpdateIntervalMin(Integer pathUpdateIntervalMin) {
+ this.pathUpdateIntervalMin = pathUpdateIntervalMin;
+ }
+
+ public Integer getMaxAgeMin() {
+ return maxAgeMin;
+ }
+
+ public void setMaxAgeMin(Integer maxAgeMin) {
+ this.maxAgeMin = maxAgeMin;
}
}
diff --git
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
index e3f6f12..081e16e 100644
---
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
+++
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-ambari.json
@@ -1,591 +1,376 @@
{
- "input":[
+ "input": [
{
- "type":"ambari_agent",
- "rowtype":"service",
- "path":"/root/test-logs/ambari-server/ambari-agent.log",
- "group": "Ambari"
- },
- {
- "type":"ambari_server",
- "rowtype":"service",
- "path":"/root/test-logs/ambari-server/ambari-server.log",
- "group": "Ambari"
- },
- {
- "type":"ambari_alerts",
- "rowtype":"service",
- "add_fields":{
- "level":"INFO"
- },
- "path":"/root/test-logs/ambari-server/ambari-alerts.log",
- "group": "Ambari"
- },
- {
- "type":"ambari_config_changes",
- "rowtype":"service",
- "path":"/root/test-logs/ambari-server/ambari-config-changes.log",
- "group": "Ambari"
- },
- {
- "type":"ambari_eclipselink",
- "rowtype":"service",
- "path":"/root/test-logs/ambari-server/ambari-eclipselink.log",
- "group": "Ambari"
- },
- {
- "type":"ambari_server_check_database",
- "rowtype":"service",
- "path":"/root/test-logs/ambari-server/ambari-server-check-database.log",
- "group": "Ambari"
- },
- {
- "type":"ambari_audit",
- "rowtype":"audit",
- "add_fields":{
- "logType":"AmbariAudit",
- "enforcer":"ambari-acl",
- "repoType":"1",
- "repo":"ambari",
- "level":"INFO"
+ "type": "ambari_audit",
+ "rowtype": "audit",
+ "add_fields": {
+ "logType": "AmbariAudit",
+ "enforcer": "ambari-acl",
+ "repoType": "1",
+ "repo": "ambari",
+ "level": "INFO"
},
- "path":"/root/test-logs/ambari-server/ambari-audit.log"
+ "path": "/root/test-logs/ambari-server/ambari-audit.log"
}
-
],
- "filter":[
- {
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
- "ambari_agent"
- ]
-
- }
-
- },
- "log4j_format":"",
- "multiline_pattern":"^(%{LOGLEVEL:level} %{TIMESTAMP_ISO8601:logtime})",
- "message_pattern":"(?m)^%{LOGLEVEL:level} %{TIMESTAMP_ISO8601:logtime}
%{JAVAFILE:file}:%{INT:line_number} - %{GREEDYDATA:log_message}",
- "post_map_values":{
- "logtime":{
- "map_date":{
- "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
- }
-
- },
- "level":{
- "map_fieldvalue":{
- "pre_value":"WARNING",
- "post_value":"WARN"
- }
-
- }
-
- }
-
- },
- {
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
- "ambari_server"
- ]
-
- }
-
- },
- "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
- "multiline_pattern":"^(%{USER_SYNC_DATE:logtime})",
-
"message_pattern":"(?m)^%{USER_SYNC_DATE:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{JAVACLASS:logger_name}:%{INT:line_number}%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
- "post_map_values":{
- "logtime":{
- "map_date":{
- "target_date_pattern":"dd MMM yyyy HH:mm:ss"
- }
-
- }
-
- }
-
- },
- {
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
- "ambari_alerts"
- ]
-
- }
-
- },
- "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
- "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{GREEDYDATA:log_message}",
- "post_map_values":{
- "logtime":{
- "map_date":{
- "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
- }
-
- }
-
- }
-
- },
- {
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
- "ambari_config_changes"
- ]
-
- }
-
- },
- "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
- "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
- "post_map_values":{
- "logtime":{
- "map_date":{
- "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
- }
-
- }
-
- }
-
- },
+ "filter": [
{
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
- "ambari_eclipselink"
- ]
-
- }
-
- },
- "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
- "multiline_pattern":"^(\\[EL%{SPACE}%{LOGLEVEL:level}\\])",
-
"message_pattern":"(?m)^\\[EL%{SPACE}%{LOGLEVEL:level}\\]:%{SPACE}%{TIMESTAMP_ISO8601:logtime}%{GREEDYDATA:log_message}",
- "post_map_values":{
- "logtime":{
- "map_date":{
- "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS"
- }
-
- },
- "level":{
- "map_fieldvalue":{
- "pre_value":"Warning",
- "post_value":"Warn"
- }
-
- }
-
- }
-
- },
- {
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
- "ambari_server_check_database"
- ]
-
- }
-
- },
- "log4j_format":"%d{DATE} %5p [%t] %c{1}:%L - %m%n",
- "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
-
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
- "post_map_values":{
- "logtime":{
- "map_date":{
- "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
- }
-
- }
-
- }
-
- },
- {
- "filter":"grok",
- "conditions":{
- "fields":{
- "type":[
+ "filter": "grok",
+ "conditions": {
+ "fields": {
+ "type": [
"ambari_audit"
]
-
}
-
},
- "log4j_format":"%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n",
- "multiline_pattern":"^(%{TIMESTAMP_ISO8601:evtTime})",
-
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:evtTime},%{SPACE}%{GREEDYDATA:log_message}",
- "post_map_values":{
- "evtTime":{
- "map_date":{
- "target_date_pattern":"yyyy-MM-dd'T'HH:mm:ss.SSSXX"
+ "log4j_format": "%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n",
+ "multiline_pattern": "^(%{TIMESTAMP_ISO8601:evtTime})",
+ "message_pattern":
"(?m)^%{TIMESTAMP_ISO8601:evtTime},%{SPACE}%{GREEDYDATA:log_message}",
+ "post_map_values": {
+ "evtTime": {
+ "map_date": {
+ "target_date_pattern": "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
}
-
}
-
}
-
},
{
- "filter":"keyvalue",
- "sort_order":1,
- "conditions":{
- "fields":{
- "type":[
+ "filter": "keyvalue",
+ "sort_order": 1,
+ "conditions": {
+ "fields": {
+ "type": [
"ambari_audit"
]
-
}
-
},
- "source_field":"log_message",
- "field_split":", ",
- "value_borders":"()",
- "post_map_values":{
- "User":{
- "map_fieldvalue":{
- "pre_value":"null",
- "post_value":"unknown"
- },
- "map_fieldname":{
- "new_fieldname":"reqUser"
+ "source_field": "log_message",
+ "field_split": ", ",
+ "value_borders": "()",
+ "post_map_values": {
+ "User": {
+ "map_fieldvalue": {
+ "pre_value": "null",
+ "post_value": "unknown"
+ },
+ "map_fieldname": {
+ "new_fieldname": "reqUser"
}
},
- "Hostname":{
- "map_fieldname":{
- "new_fieldname":"host"
+ "Hostname": {
+ "map_fieldname": {
+ "new_fieldname": "host"
}
},
- "Host name":{
- "map_fieldname":{
- "new_fieldname":"host"
+ "Host name": {
+ "map_fieldname": {
+ "new_fieldname": "host"
}
},
- "RemoteIp":{
- "map_fieldname":{
- "new_fieldname":"cliIP"
+ "RemoteIp": {
+ "map_fieldname": {
+ "new_fieldname": "cliIP"
}
},
- "RequestType":{
- "map_fieldname":{
- "new_fieldname":"cliType"
+ "RequestType": {
+ "map_fieldname": {
+ "new_fieldname": "cliType"
}
},
- "Operation":{
- "map_fieldname":{
- "new_fieldname":"action"
+ "Operation": {
+ "map_fieldname": {
+ "new_fieldname": "action"
}
},
- "url":{
- "map_fieldname":{
- "new_fieldname":"resource"
+ "url": {
+ "map_fieldname": {
+ "new_fieldname": "resource"
}
},
- "ResourcePath":{
- "map_fieldname":{
- "new_fieldname":"resource"
+ "ResourcePath": {
+ "map_fieldname": {
+ "new_fieldname": "resource"
}
},
- "Cluster name":{
- "map_fieldname":{
- "new_fieldname":"cluster"
+ "Cluster name": {
+ "map_fieldname": {
+ "new_fieldname": "cluster"
}
},
- "Reason":{
- "map_fieldname":{
- "new_fieldname":"reason"
+ "Reason": {
+ "map_fieldname": {
+ "new_fieldname": "reason"
}
},
- "Base URL":{
- "map_fieldname":{
- "new_fieldname":"ws_base_url"
+ "Base URL": {
+ "map_fieldname": {
+ "new_fieldname": "ws_base_url"
}
},
- "Command":{
- "map_fieldvalue":{
- "pre_value":"null",
- "post_value":"unknown"
+ "Command": {
+ "map_fieldvalue": {
+ "pre_value": "null",
+ "post_value": "unknown"
},
- "map_fieldname":{
- "new_fieldname":"ws_command"
+ "map_fieldname": {
+ "new_fieldname": "ws_command"
}
},
- "Component":{
- "map_fieldname":{
- "new_fieldname":"ws_component"
+ "Component": {
+ "map_fieldname": {
+ "new_fieldname": "ws_component"
}
},
- "Details":{
- "map_fieldname":{
- "new_fieldname":"ws_details"
+ "Details": {
+ "map_fieldname": {
+ "new_fieldname": "ws_details"
}
},
- "Display name":{
- "map_fieldvalue":{
- "pre_value":"null",
- "post_value":"unknown"
+ "Display name": {
+ "map_fieldvalue": {
+ "pre_value": "null",
+ "post_value": "unknown"
},
- "map_fieldname":{
- "new_fieldname":"ws_display_name"
+ "map_fieldname": {
+ "new_fieldname": "ws_display_name"
}
},
- "OS":{
- "map_fieldname":{
- "new_fieldname":"ws_os"
+ "OS": {
+ "map_fieldname": {
+ "new_fieldname": "ws_os"
}
},
- "Repo id":{
- "map_fieldname":{
- "new_fieldname":"ws_repo_id"
+ "Repo id": {
+ "map_fieldname": {
+ "new_fieldname": "ws_repo_id"
}
},
- "Repo version":{
- "map_fieldvalue":{
- "pre_value":"null",
- "post_value":"unknown"
+ "Repo version": {
+ "map_fieldvalue": {
+ "pre_value": "null",
+ "post_value": "unknown"
},
- "map_fieldname":{
- "new_fieldname":"ws_repo_version"
+ "map_fieldname": {
+ "new_fieldname": "ws_repo_version"
}
},
- "Repositories":{
- "map_fieldname":{
- "new_fieldname":"ws_repositories"
+ "Repositories": {
+ "map_fieldname": {
+ "new_fieldname": "ws_repositories"
}
},
- "RequestId":{
- "map_fieldname":{
- "new_fieldname":"ws_request_id"
+ "RequestId": {
+ "map_fieldname": {
+ "new_fieldname": "ws_request_id"
}
},
- "Roles":{
- "map_fieldname":{
- "new_fieldname":"ws_roles"
+ "Roles": {
+ "map_fieldname": {
+ "new_fieldname": "ws_roles"
}
},
- "Stack":{
- "map_fieldname":{
- "new_fieldname":"ws_stack"
+ "Stack": {
+ "map_fieldname": {
+ "new_fieldname": "ws_stack"
}
},
- "Stack version":{
- "map_fieldname":{
- "new_fieldname":"ws_stack_version"
+ "Stack version": {
+ "map_fieldname": {
+ "new_fieldname": "ws_stack_version"
}
},
- "TaskId":{
- "map_fieldname":{
- "new_fieldname":"ws_task_id"
+ "TaskId": {
+ "map_fieldname": {
+ "new_fieldname": "ws_task_id"
}
},
- "VersionNote":{
- "map_fieldvalue":{
- "pre_value":"null",
- "post_value":"unknown"
+ "VersionNote": {
+ "map_fieldvalue": {
+ "pre_value": "null",
+ "post_value": "unknown"
},
- "map_fieldname":{
- "new_fieldname":"ws_version_note"
+ "map_fieldname": {
+ "new_fieldname": "ws_version_note"
}
},
- "VersionNumber":{
- "map_fieldvalue":{
- "pre_value":"null",
- "post_value":"unknown"
+ "VersionNumber": {
+ "map_fieldvalue": {
+ "pre_value": "null",
+ "post_value": "unknown"
},
- "map_fieldname":{
- "new_fieldname":"ws_version_number"
+ "map_fieldname": {
+ "new_fieldname": "ws_version_number"
}
},
- "Status":[
- {
- "map_fieldcopy":{
- "copy_name": "ws_status"
- }
- },
- {
- "map_fieldvalue":{
- "pre_value":"Success",
- "post_value":"1"
+ "Status": [
+ {
+ "map_fieldcopy": {
+ "copy_name": "ws_status"
+ }
+ },
+ {
+ "map_fieldvalue": {
+ "pre_value": "Success",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"Successfully queued",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "Successfully queued",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"QUEUED",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "QUEUED",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"PENDING",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "PENDING",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"COMPLETED",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "COMPLETED",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"IN_PROGRESS",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "IN_PROGRESS",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"Failed",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "Failed",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"Failed to queue",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "Failed to queue",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"HOLDING",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "HOLDING",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"HOLDING_FAILED",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "HOLDING_FAILED",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"HOLDING_TIMEDOUT",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "HOLDING_TIMEDOUT",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"FAILED",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "FAILED",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"TIMEDOUT",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "TIMEDOUT",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"ABORTED",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "ABORTED",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"SKIPPED_FAILED",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "SKIPPED_FAILED",
+ "post_value": "0"
}
},
{
- "map_fieldname":{
- "new_fieldname":"result"
+ "map_fieldname": {
+ "new_fieldname": "result"
}
}
],
- "ResultStatus":[
+ "ResultStatus": [
{
- "map_fieldcopy":{
+ "map_fieldcopy": {
"copy_name": "ws_result_status"
}
},
{
- "map_fieldvalue":{
- "pre_value":"200 OK",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "200 OK",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"201 Created",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "201 Created",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"202 Accepted",
- "post_value":"1"
+ "map_fieldvalue": {
+ "pre_value": "202 Accepted",
+ "post_value": "1"
}
},
{
- "map_fieldvalue":{
- "pre_value":"400 Bad Request",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "400 Bad Request",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"401 Unauthorized",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "401 Unauthorized",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"403 Forbidden",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "403 Forbidden",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"404 Not Found",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "404 Not Found",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"409 Resource Conflict",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "409 Resource Conflict",
+ "post_value": "0"
}
},
{
- "map_fieldvalue":{
- "pre_value":"500 Internal Server Error",
- "post_value":"0"
+ "map_fieldvalue": {
+ "pre_value": "500 Internal Server Error",
+ "post_value": "0"
}
},
{
- "map_fieldname":{
- "new_fieldname":"result"
+ "map_fieldname": {
+ "new_fieldname": "result"
}
}
]
-
}
-
}
-
]
-
}
diff --git
a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
new file mode 100644
index 0000000..68e6fcf
--- /dev/null
+++
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
@@ -0,0 +1,75 @@
+{
+ "input":[
+ {
+ "type":"storm_worker",
+ "rowtype":"service",
+ "path":"/root/test-logs/storm/worker-logs/*/*/worker.log",
+ "init_default_fields": "true"
+ }
+ ],
+ "filter":[
+ {
+ "filter":"grok",
+ "sort_order": 1,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "log4j_format":"",
+ "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
+
"message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{JAVACLASS:logger_name}\\s%{GREEDYDATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}",
+ "post_map_values":{
+ "logtime":{
+ "map_date":{
+ "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS"
+ }
+ }
+ }
+ },
+ {
+ "filter":"grok",
+ "sort_order": 2,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "source_field": "thread_name",
+ "remove_source_field": "false",
+
"message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:sdi_storm_component_name}\\-executor%{DATA}|%{DATA:thread_name})"
+ },
+ {
+ "filter":"grok",
+ "sort_order": 3,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "source_field": "path",
+ "remove_source_field": "false",
+
"message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:sdi_storm_topology_id}/%{DATA:sdi_storm_worker_port}/worker\\.log"
+ },
+ {
+ "filter":"grok",
+ "sort_order": 4,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "source_field": "sdi_storm_topology_id",
+ "remove_source_field": "false",
+
"message_pattern":"(streamline\\-%{DATA:sdi_streamline_topology_id}\\-%{DATA:sdi_streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{DATA:sdi_storm_topology_id})"
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6700/worker.log
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6700/worker.log
new file mode 100644
index 0000000..b6a59ec
--- /dev/null
+++
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6700/worker.log
@@ -0,0 +1,5 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[7 8]
[INFO] Preparing bolt __acker:(1)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[7 8]
[WARN] Prepared bolt __acker:(2)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier
Thread-7-8-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing
transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor
3-NOTIFICATION:[9 1]
+2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[7 8]
[WARN] Prepared bolt __acker:(3)
diff --git
a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
new file mode 100644
index 0000000..5f2d20e
--- /dev/null
+++
b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
@@ -0,0 +1,5 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5]
[INFO] Preparing bolt __acker:(4)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5]
[WARN] Prepared bolt __acker:(5)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier
Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing
transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor
3-NOTIFICATION:[3 1]
+2017-10-23 13:41:59.242 o.a.s.d.executor Thread-11-__acker-executor[5 5]
[WARN] Prepared bolt __acker:(6)
--
To stop receiving notification emails like this one, please contact
[email protected].