[
https://issues.apache.org/jira/browse/METRON-694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15863648#comment-15863648
]
ASF GitHub Bot commented on METRON-694:
---------------------------------------
GitHub user merrimanr opened a pull request:
https://github.com/apache/incubator-metron/pull/453
METRON-694: Index Errors from Topologies
This PR addresses METRON-695, including updates to the Ambari MPack. A
summary of the changes:
- Defaulted FieldValidator.input to empty list
- Added MetronError abstraction, updated ErrorUtils
- Added MetronGetStrategy abstraction
- Updated Constants to only have 1 error topic and added error field and
error type enums
- Updated flux property files for enrichment and indexing topologies to
send errors to single error topic
- Added new error topology including start script, index zookeeper config,
flux file and flux property file
- Updated integration tests with single error topic and error handling test
cases where applicable
- Updated unit tests to cover new error handling code
- Removed invalid data fields, invalid is considered error now
- Exposed Kafka offset as a parameter in ParserTopologyComponent
- Added error indexing topology to Ambari MPack
There are 2 areas of review that I would like to highlight. The first is
the abstraction for handling errors in the various Metron topologies. Error
handling logic is not consistent across topologies, ranging from calling the
ErrorUtils.handleError method to simply logging the exception. The
ErrorUtils.handleError was the most common method used so I decided to extend
that to accommodate other bolts and topologies. As I worked through the
details I found myself either having to add several additional
ErrorUtils.handleError methods to cover all the different combinations of error
message properties or creating several empty Optional objects, making the code
more verbose and confusing than it should be. So I moved the error message
generating logic previously in ErrorUtils.generateErrorMessage to a separate
MetronError class that follows a builder/fluent design pattern. With this
change, capturing an error now follows this pattern:
MetronError error = new MetronError()
.withErrorProperty(errorProperty) // error properties could
include caught exception, raw message(s), error type, etc
... // add as many properties as needed
ErrorUtils.handleError(collector, error);
I expect there will be many opinions about the correct approach here.
The other abstraction involves retrieving a message from a tuple. The
primary driver for this is logging the original message that caused a failure.
The BulkWriterComponent class is passed both a tuple and a message which is
then passed to the BulkMessageWriter.write method. There are 2 challenges with
this. The first is that the returned BulkWriterResponse object only contains
the tuples that failed and not the messages. The other is that a message could
have been transformed before being passed to the BulkWriterComponent so we need
a way to get the original message again in case of a failure so that it may be
replayed. To solve this, I created a MessageGetStrategy interface that can be
passed around between components to retrieve the original message if needed.
This could become a useful abstraction for other uses cases as well, for
example making the BulkMessageWriterBolt configurable through flux.
All applicable unit and integration tests were updated (or created if they
didn't already exist). This can be tested in Quick Dev by editing the
elasticsearch_error.properties to match the environment and starting the
topology with the start_elasticsearch_error_topology.sh script. The Bro sensor
in that environment includes messages that fail to parse so error messages
should be generated by default as long as that sensor is running. These steps
are manual because I'm assuming Ansible is being deprecated. For a more
automated approach, this can (and should) be tested with the Ambari MPack. I
am still trying to think of ways to simulate errors in other parts of the
topology so if anyone has ideas let me know.
I apologize for the size of this PR but error handling cuts across many
different modules and classes and required a lot of updates so it was
unavoidable. I decided to not update the profiler topology yet because it does
not include a bolt for handling errors like the other topologies. This would
require changes to the profiler topology architecture, adding to an already
very large PR. I am happy to take that on in a separate pull request or this
one if everyone feels it should be included now.
A special thanks to @justinleet for helping me with the Ambari MPack and
@cestella for helping with the MessageGetStrategy abstraction (that he had
already started with MessageGetters). Looking forward to some feedback!
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/merrimanr/incubator-metron METRON-695
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-metron/pull/453.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #453
----
commit 4a329ed241f60057136de00e1d716fb81d32d337
Author: rmerriman <[email protected]>
Date: 2017-02-06T21:19:45Z
Initial commit
commit a05fdcdd56ff4c1ee33a643ec065be352d88cd3b
Author: rmerriman <[email protected]>
Date: 2017-02-06T21:20:24Z
Merge remote-tracking branch 'mirror/master' into METRON-695
commit a24e621d79a105ebe1a1c69d0fb1601d7940f96b
Author: rmerriman <[email protected]>
Date: 2017-02-08T00:22:16Z
Updated tests to include error conditions
commit 6b897d5afadde00870e2579af9c3c57d9ef9d076
Author: rmerriman <[email protected]>
Date: 2017-02-08T00:22:47Z
Added error topology to Ambari MPack
----
> Index Errors from Topologies
> ----------------------------
>
> Key: METRON-694
> URL: https://issues.apache.org/jira/browse/METRON-694
> Project: Metron
> Issue Type: Bug
> Reporter: Ryan Merriman
>
> Need to make sure (and review) that all the bolts write into the error queue.
> Errors should then be consumed from the error queue and indexed.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)