This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 1b6ef88 METRON-1970 Add Metadata to Error Messages Generated During
Parsing (nickwallen) closes apache/metron#1325
1b6ef88 is described below
commit 1b6ef88c79d60022542cda7e9abbea7e720773cc
Author: nickwallen <[email protected]>
AuthorDate: Tue Feb 12 12:18:09 2019 -0500
METRON-1970 Add Metadata to Error Messages Generated During Parsing
(nickwallen) closes apache/metron#1325
---
.../java/org/apache/metron/common/Constants.java | 1 +
.../apache/metron/common/error/MetronError.java | 79 +++++++++++-----------
.../metron/common/error/MetronErrorTest.java | 47 +++++++++++++
metron-platform/metron-parsing/README.md | 51 ++++++++++----
.../apache/metron/parsers/ParserRunnerImpl.java | 3 +
.../metron/parsers/ParserRunnerImplTest.java | 16 ++++-
6 files changed, 141 insertions(+), 56 deletions(-)
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 5054508..a0b5bce 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -93,6 +93,7 @@ public class Constants {
,RAW_MESSAGE_BYTES("raw_message_bytes")
,ERROR_FIELDS("error_fields")
,ERROR_HASH("error_hash")
+ ,METADATA("metadata")
;
private String name;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
index 0493be6..89044de 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
+++
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -25,10 +25,14 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.Constants;
@@ -45,6 +49,7 @@ public class MetronError {
private ErrorType errorType = ErrorType.DEFAULT_ERROR;
private Set<String> errorFields;
private List<Object> rawMessages;
+ private Map<String, Object> metadata = new HashMap<>();
public MetronError withMessage(String message) {
this.message = message;
@@ -71,6 +76,10 @@ public class MetronError {
return this;
}
+ public MetronError withMetadata(Map<String, Object> metadata) {
+ this.metadata.putAll(metadata);
+ return this;
+ }
public MetronError addRawMessage(Object rawMessage) {
if (rawMessage != null) {
@@ -95,25 +104,28 @@ public class MetronError {
public JSONObject getJSONObject() {
JSONObject errorMessage = new JSONObject();
errorMessage.put(Constants.GUID, UUID.randomUUID().toString());
- errorMessage.put(Constants.SENSOR_TYPE, "error");
- if (sensorTypes.size() == 1) {
- errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(),
sensorTypes.iterator().next());
- } else {
- errorMessage
- .put(ErrorFields.FAILED_SENSOR_TYPE.getName(), new
JSONArray().addAll(sensorTypes));
- }
+ errorMessage.put(Constants.SENSOR_TYPE, Constants.ERROR_TYPE);
errorMessage.put(ErrorFields.ERROR_TYPE.getName(), errorType.getType());
-
+ addFailedSensorType(errorMessage);
addMessageString(errorMessage);
addStacktrace(errorMessage);
addTimestamp(errorMessage);
addHostname(errorMessage);
addRawMessages(errorMessage);
addErrorHash(errorMessage);
+ addMetadata(errorMessage);
return errorMessage;
}
+ private void addFailedSensorType(JSONObject errorMessage) {
+ if (sensorTypes.size() == 1) {
+ errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(),
sensorTypes.iterator().next());
+ } else {
+ errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), new
JSONArray().addAll(sensorTypes));
+ }
+ }
+
@SuppressWarnings({"unchecked"})
private void addMessageString(JSONObject errorMessage) {
if (message != null) {
@@ -192,44 +204,31 @@ public class MetronError {
}
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
+ private void addMetadata(JSONObject errorMessage) {
+ if(metadata != null && metadata.keySet().size() > 0) {
+ // add each metadata element directly to the message. each metadata key
already has
+ // a standard prefix, no need to add another prefix to avoid collisions.
this mimics
+ // the behavior of merging metadata.
+ errorMessage.putAll(metadata);
}
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MetronError)) return false;
MetronError that = (MetronError) o;
-
- if (message != null ? !message.equals(that.message) : that.message !=
null) {
- return false;
- }
- if (getThrowable() != null ? !getThrowable().equals(that.getThrowable())
- : that.getThrowable() != null) {
- return false;
- }
- if (sensorTypes != null ? !sensorTypes.equals(that.sensorTypes) :
that.sensorTypes != null) {
- return false;
- }
- if (errorType != that.errorType) {
- return false;
- }
- if (errorFields != null ? !errorFields.equals(that.errorFields) :
that.errorFields != null) {
- return false;
- }
- return rawMessages != null ? rawMessages.equals(that.rawMessages) :
that.rawMessages == null;
+ return Objects.equals(message, that.message) &&
+ Objects.equals(throwable, that.throwable) &&
+ Objects.equals(sensorTypes, that.sensorTypes) &&
+ errorType == that.errorType &&
+ Objects.equals(errorFields, that.errorFields) &&
+ Objects.equals(rawMessages, that.rawMessages) &&
+ Objects.equals(metadata, that.metadata);
}
@Override
public int hashCode() {
- int result = message != null ? message.hashCode() : 0;
- result = 31 * result + (getThrowable() != null ? getThrowable().hashCode()
: 0);
- result = 31 * result + (sensorTypes != null ? sensorTypes.hashCode() : 0);
- result = 31 * result + (errorType != null ? errorType.hashCode() : 0);
- result = 31 * result + (errorFields != null ? errorFields.hashCode() : 0);
- result = 31 * result + (rawMessages != null ? rawMessages.hashCode() : 0);
- return result;
+ return Objects.hash(message, throwable, sensorTypes, errorType,
errorFields, rawMessages, metadata);
}
}
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
index 177a232..294d6dc 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
+++
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
@@ -18,6 +18,7 @@
package org.apache.metron.common.error;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets;
@@ -25,6 +26,9 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.Constants;
import org.json.simple.JSONObject;
@@ -118,4 +122,47 @@ public class MetronErrorTest {
assertEquals(Sets.newHashSet("field1", "field2"),
Sets.newHashSet(((String)
errorJSON.get(Constants.ErrorFields.ERROR_FIELDS.getName())).split(",")));
assertEquals("04a2629c39e098c3944be85f35c75876598f2b44b8e5e3f52c59fa1ac182817c",
errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
}
+
+ @Test
+ public void shouldIncludeMessageMetadata() {
+ // the metadata that should be included in the error message
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("metron.metadata.topic", "bro");
+ metadata.put("metron.metadata.partition", 0);
+ metadata.put("metron.metadata.offset", 123);
+
+ JSONObject message = new JSONObject();
+ message.put("field1", "value1");
+ message.put("field2", "value2");
+
+ MetronError error = new MetronError()
+ .addRawMessage(message)
+ .withMetadata(metadata);
+
+ // expect the metadata to be flattened and folded into the error message
+ JSONObject errorMessage = error.getJSONObject();
+ assertEquals("bro", errorMessage.get("metron.metadata.topic"));
+ assertEquals(0, errorMessage.get("metron.metadata.partition"));
+ assertEquals(123, errorMessage.get("metron.metadata.offset"));
+ }
+
+ @Test
+ public void shouldNotIncludeEmptyMetadata() {
+ // there is no metadata
+ Map<String, Object> metadata = new HashMap<>();
+
+ JSONObject message = new JSONObject();
+ message.put("field1", "value1");
+ message.put("field2", "value2");
+
+ MetronError error = new MetronError()
+ .addRawMessage(message)
+ .withMetadata(metadata);
+
+ // expect the metadata to be flattened and folded into the error message
+ JSONObject errorMessage = error.getJSONObject();
+ assertFalse(errorMessage.containsKey("metron.metadata.topic"));
+ assertFalse(errorMessage.containsKey("metron.metadata.partition"));
+ assertFalse(errorMessage.containsKey("metron.metadata.offset"));
+ }
}
diff --git a/metron-platform/metron-parsing/README.md
b/metron-platform/metron-parsing/README.md
index 9bbd39f..b8f44cb 100644
--- a/metron-platform/metron-parsing/README.md
+++ b/metron-platform/metron-parsing/README.md
@@ -165,13 +165,13 @@ Errors, which are defined as unexpected exceptions
happening during the
parse, are sent along to the error queue with a message indicating that
there was an error in parse along with a stacktrace. This is to
distinguish from the invalid messages.
-
+
## Filtered
One can also filter a message by specifying a `filterClassName` in the
parser config. Filtered messages are just dropped rather than passed
through.
-
+
## Parser Architecture

@@ -180,7 +180,7 @@ Data flows through the parser via kafka and into the
`enrichments`
topology in kafka. Errors are collected with the context of the error
(e.g. stacktrace) and original message causing the error and sent to an
`error` queue. Invalid messages as determined by global validation
-functions are also treated as errors and sent to an `error` queue.
+functions are also treated as errors and sent to an `error` queue.
## Message Format
@@ -218,7 +218,7 @@ So putting it all together a typical Metron message with
all 5-tuple fields pres
}
```
-## Global Configuration
+## Global Configuration
There are a few properties which can be managed in the global configuration
that have pertinence to
parsers and parsing in general.
@@ -261,7 +261,7 @@ The document is structured in the following way
}
```
-* `sensorTopic` : The kafka topic to send the parsed messages to. If the
topic is prefixed and suffixed by `/`
+* `sensorTopic` : The kafka topic to send the parsed messages to. If the
topic is prefixed and suffixed by `/`
then it is assumed to be a regex and will match any topic matching the pattern
(e.g. `/bro.*/` would match `bro_cust0`, `bro_cust1` and `bro_cust2`)
* `readMetadata` : Boolean indicating whether to read metadata or not (The
default is raw message strategy dependent). See below for a discussion about
metadata.
* `mergeMetadata` : Boolean indicating whether to merge metadata with the
message or not (The default is raw message strategy dependent). See below for
a discussion about metadata.
@@ -291,7 +291,7 @@ then it is assumed to be a regex and will match any topic
matching the pattern (
```
The `fieldTransformations` is a complex object which defines a
-transformation which can be done to a message. This transformation can
+transformation which can be done to a message. This transformation can
* Modify existing fields to a message
* Add new fields given the values of existing fields of a message
* Remove existing fields of a message
@@ -303,7 +303,7 @@ For platform specific configs, see the README of the
appropriate project. This w
Metadata is a useful thing to send to Metron and use during enrichment or
threat intelligence.
Consider the following scenarios:
-* You have multiple telemetry sources of the same type that you want to
+* You have multiple telemetry sources of the same type that you want to
* ensure downstream analysts can differentiate
* ensure profiles consider independently as they have different
seasonality or some other fundamental characteristic
@@ -311,7 +311,7 @@ As such, there are two types of metadata that we seek to
support in Metron:
* Environmental metadata : Metadata about the system at large
* Consider the possibility that you have multiple kafka topics being
processed by one parser and you want to tag the messages with the kafka topic
* At the moment, only the kafka topic is kept as the field name.
-* Custom metadata: Custom metadata from an individual telemetry source that
one might want to use within Metron.
+* Custom metadata: Custom metadata from an individual telemetry source that
one might want to use within Metron.
Metadata is controlled by the following parser configs:
* `rawMessageStrategy` : This is a strategy which indicates how to read data
and metadata. The strategies supported are:
@@ -324,7 +324,7 @@ Metadata is controlled by the following parser configs:
* `ENVELOPE`
* `metadataPrefix` defines the key prefix for metadata (default is
`metron.metadata`)
* `messageField` defines the field from the envelope to use as the
data. All other fields are considered metadata.
-* `readMetadata` : This is a boolean indicating whether metadata will be read
and made available to Field
+* `readMetadata` : This is a boolean indicating whether metadata will be read
and made available to Field
transformations (i.e. Stellar field transformations). The default is
dependent upon the `rawMessageStrategy`:
* `DEFAULT` : default to `false`.
@@ -350,7 +350,31 @@ For instance, sending a metadata field called
`customer_id` could be done by sen
in the kafka key. This would be exposed as the field
`metron.metadata.customer_id` to stellar field transformations
as well, if `mergeMetadata` is `true`, available as a field in its own right.
+#### Metadata and Error Handling
+
+When a telemetry message fails to parse correctly, a separate error message is
produced and sent to the error topic. This error message will contain detailed
information to reflect the error that occurred.
+
+If the telemetry message that failed contains metadata, this metadata is
included in the error message. For example, here is an error message that
contains two metadata fields; `metron.metadata.topic` and
`metron.metadata.customer`.
+
+```
+{
+ "exception": "java.lang.IllegalStateException: Unable to parse Message:
\"this is an invalid synthetic message\" }",
+ "stack": "java.lang.IllegalStateException: Unable to parse Message: \"this
is an invalid synthetic message\" ...\n",
+ "raw_message": "\"this is an invalid synthetic message\" }",
+ "error_hash":
"3d498968e8df7f28d05db3037d4ad2a3a0095c22c14d881be45fac3f184dbcc3",
+ "message": "Unable to parse Message: \"this is an invalid synthetic
message\" }",
+ "source.type": "error",
+ "failed_sensor_type": "bro",
+ "hostname": "node1",
+ "error_type": "parser_error",
+ "guid": "563d8d2a-1493-4758-be2f-5613bfd2d615",
+ "timestamp": 1548366516634,
+ "metron.metadata.topic": "bro",
+ "metron.metadata.customer": "acme-inc"
+}
+```
+By default, error messages are sent to the `indexing` topic. This will cause
the errors to be indexed in whichever endpoints you have configured, namely
Solr, Elasticsearch, and HDFS. You may need to update your configuration of
these endpoints to accurately reflect the metadata fields contained in the
error message. For example, you may need to update the schema definition of
your Solr Collection for the metadata fields to be accurately indexed in the
Error collection.
### `fieldTransformation` configuration
@@ -359,9 +383,9 @@ The format of a `fieldTransformation` is as follows:
* `output` : The outputs to produce from the transformation. If unspecified,
it is assumed to be the same as inputs.
* `transformation` : The fully qualified classname of the transformation to be
used. This is either a class which implements `FieldTransformation` or a
member of the `FieldTransformations` enum.
* `config` : A String to Object map of transformation specific configuration.
-
+
The currently implemented fieldTransformations are:
-* `REMOVE` : This transformation removes the specified input fields. If you
want a conditional removal, you can pass a Metron Query Language statement to
define the conditions under which you want to remove the fields.
+* `REMOVE` : This transformation removes the specified input fields. If you
want a conditional removal, you can pass a Metron Query Language statement to
define the conditions under which you want to remove the fields.
Consider the following simple configuration which will remove `field1`
unconditionally:
@@ -396,7 +420,7 @@ The currently implemented fieldTransformations are:
}
```
-* `SELECT`: This transformation filters the fields in the message to include
only the configured output fields, and drops any not explicitly included.
+* `SELECT`: This transformation filters the fields in the message to include
only the configured output fields, and drops any not explicitly included.
For example:
@@ -459,7 +483,7 @@ and the values for the config map are the associated new
field name.
```
* `REGEX_SELECT` : This transformation lets users set an output field to one
of a set of possibilities based on matching regexes. This transformation is
useful when the number or conditions are large enough to make a stellar
language match statement unwieldy.
-
+
The following config will set the field `logical_source_type` to one of the
following, dependent upon the value of the `pix_type` field:
* `cisco-6-302` if `pix_type` starts with either `6-302` or `06-302`
@@ -659,4 +683,3 @@ from your parser topology.
- [JSON Path concept](http://goessner.net/articles/JsonPath/)
- [Read about JSON Path library Apache Metron
uses](https://github.com/json-path/JsonPath)
- [Try JSON Path expressions online](http://jsonpath.herokuapp.com)
-
diff --git
a/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
b/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
index df3ed1c..dfad188 100644
---
a/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
+++
b/metron-platform/metron-parsing/metron-parsers-common/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -162,6 +162,7 @@ public class ParserRunnerImpl implements
ParserRunner<JSONObject>, Serializable
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(throwable)
.withSensorType(Collections.singleton(sensorType))
+ .withMetadata(rawMessage.getMetadata())
.addRawMessage(rawMessage.getMessage())));
// If exceptions are thrown by the MessageParser, wrap them with
MetronErrors and add them to the list of errors
@@ -169,6 +170,7 @@ public class ParserRunnerImpl implements
ParserRunner<JSONObject>, Serializable
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(entry.getValue())
.withSensorType(Collections.singleton(sensorType))
+ .withMetadata(rawMessage.getMetadata())
.addRawMessage(entry.getKey())).collect(Collectors.toList()));
}
} else {
@@ -264,6 +266,7 @@ public class ParserRunnerImpl implements
ParserRunner<JSONObject>, Serializable
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(Collections.singleton(sensorType))
+ .withMetadata(rawMessage.getMetadata())
.addRawMessage(message);
Set<String> errorFields = failedValidators == null ? null :
failedValidators.stream()
.flatMap(fieldValidator -> fieldValidator.getInput().stream())
diff --git
a/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
b/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
index 29a625d..2d04d40 100644
---
a/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
+++
b/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
@@ -327,9 +327,14 @@ public class ParserRunnerImplTest {
@Test
public void shouldReturnMetronErrorOnInvalidMessage() {
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("metron.metadata.topic", "bro");
+ metadata.put("metron.metadata.partition", 0);
+ metadata.put("metron.metadata.offset", 123);
+
JSONObject inputMessage = new JSONObject();
inputMessage.put("guid", "guid");
- RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new
HashMap<>());
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), metadata);
JSONObject expectedOutput = new JSONObject();
expectedOutput.put("guid", "guid");
@@ -337,6 +342,7 @@ public class ParserRunnerImplTest {
MetronError expectedMetronError = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(Collections.singleton("bro"))
+ .withMetadata(metadata)
.addRawMessage(inputMessage);
when(stellarFilter.emit(expectedOutput,
parserRunner.getStellarContext())).thenReturn(true);
@@ -355,11 +361,16 @@ public class ParserRunnerImplTest {
@Test
public void shouldReturnMetronErrorOnFailedFieldValidator() {
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("metron.metadata.topic", "bro");
+ metadata.put("metron.metadata.partition", 0);
+ metadata.put("metron.metadata.offset", 123);
+
JSONObject inputMessage = new JSONObject();
inputMessage.put("guid", "guid");
inputMessage.put("ip_src_addr", "test");
inputMessage.put("ip_dst_addr", "test");
- RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new
HashMap<>());
+ RawMessage rawMessage = new RawMessage("raw_message".getBytes(), metadata);
JSONObject expectedOutput = new JSONObject();
expectedOutput.put("guid", "guid");
@@ -370,6 +381,7 @@ public class ParserRunnerImplTest {
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(Collections.singleton("bro"))
.addRawMessage(inputMessage)
+ .withMetadata(metadata)
.withErrorFields(new HashSet<>(Arrays.asList("ip_src_addr",
"ip_dst_addr")));
when(stellarFilter.emit(expectedOutput,
parserRunner.getStellarContext())).thenReturn(true);