merrimanr opened a new pull request #1330: METRON-1968: Messages are lost when a parser produces multiple messages and batch size is greater than 1 URL: https://github.com/apache/metron/pull/1330 ## Contributor Comments This PR represents a fairly significant shift in the Writer class architecture. Currently these classes do not support tuples that result in multiple messages, mainly due to a limitation in the `BulkMessageWriter` interface. The `write` method accepts separate lists of tuples and messages so there is no way to know which tuples are associated with which message. It has worked so far with parsers that only emit a single message from a tuple because a 1 to 1 relationship is assumed in these classes. I experimented with several different approaches to fixing this and tried to follow a strategy that would avoid having to significantly rewrite multiple classes (this was unavoidable in certain places). I change the `BulkMessageWriter.write` interface from: ``` BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception; ``` to: ``` BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Map<String, MESSAGE_T> messages) throws Exception; ``` The messages being passed in are now represented as a map where the keys are message ids and the values are the messages. This made changes to the `BulkMessageWriter` implementations easier because they continue to assume a 1 to 1 relationship between message ids (formerly tuples) and messages. Now the writers report which message ids succeeded/failed instead of tuples. The `BulkWriterComponent` continues track batch sizes and timeouts but no longer manages tuples or error handling. Instead it calls `BulkWriterResponseHandler.handleFlush` when messages are flushed. This interface is injected into `BulkWriterComponent` and allows us to properly commit messages according to the requirements of the platform the classes are running on. For now a Storm implementation of `BulkWriterResponseHandler` is setup and passed into the `BulkWriterComponent`. Managing tuple to message relationships as well as tuple acking and error handling are now done in a single class. The bolts now have to setup a `StormBulkWriterResponseHandler` object and add tuples/messages to it. ### Changes Included - The `BulkMessageWriter` is updated and all implementations have been adjusted to conform to the new interfaces. Changes to the implementations are small and straightforward. - Tuple acking and error handling is moved out of `BulkWriterComponent` and into `StormBulkWriterResponseHandler`. - Bolts have been updated with the new `StormBulkWriterResponseHandler` pattern - I noticed `HBaseWriter` and `PcapWriter are no longer being used. Rather than go to the trouble of updating their tests I removed them. - Removed `WriterHandler.handleAck` since acks are no longer handled here. - Added logging to make troubleshooting writer issues easier There were also several significant changes that needed to be made to the tests: - The writer tests are updated to match the new `BulkMessageWriter.write` interface. In most cases these changes were simple however I noticed there were no unit tests for `KafkaWriter.write` so I added them. - The parser integration tests now verify the all tuples were acked. - There is now a parser integration test that simulates the use case described in this Jira. The `jsonMapQuery` parser integration test now produces multiple messages from a single tuple and sets the batch size to 5. - A test was added for `StormBulkWriterResponseHandler` that also simulates the use case described in this Jira. Error handling tests that were originally in `BulkWriterComponentTest` were migrated here. ### Testing This has been tested in full dev both for regression and for the use case described in this Jira. There are 3 test cases: - bro sensor with a batch size set to 5 - snort sensor with a batch size set to 1 - jsonMapQuery sensor that produces multiple messages from a tuple with a batch size set to both less than and greater than the number of messages #### Setup 1. Spin up full dev and verify data is landing in both Elasticsearch and HDFS. 2. Stop all sensors with `service sensor-stubs stop`. 3. Create the jsonMapQuery sensor topic: ``` /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper node1:2181 --create --topic jsonMapQuery --partitions 1 --replication-factor 1 ``` 4. Copy the jsonMapQuery sample data to full dev: ``` cd /metron-deployment/development/centos6 vagrant scp ../../../metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput /tmp ``` 5. Stop Metron Parsers in Ambari. 6. Navigate to Ambari > Metron > Configs > Parsers and change "Metron Parsers" from `bro,snort,yaf` to `bro,snort,jsonMapQuery`. 7. Start Metron Parsers in Ambari. 8. Clear the Elasticsearch indices: ``` curl -XDELETE http://node1:9200/bro_index* curl -XDELETE http://node1:9200/snort_index* ``` 9. Stop Metron Indexing in Ambari. 10. Clear the HDFS files: ``` su hdfs -c "hdfs dfs -rm /apps/metron/indexing/indexed/bro/*" su hdfs -c "hdfs dfs -rm /apps/metron/indexing/indexed/snort/*" ``` 11. Start Metron Indexing in Ambari. 12. Verify there is no bro or snort data in Elasticsearch or HDFS: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep bro [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep snort [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json | wc -l cat: `/apps/metron/indexing/indexed/bro/*.json': No such file or directory 0 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/snort/*.json | wc -l cat: `/apps/metron/indexing/indexed/snort/*.json': No such file or directory 0 ``` 13. Each topology (parsing, enrichment, indexing) can be configured with a batch size and timeout. The batch timeout defaults to 1/2 the tuple timeout so tests described next should be done reasonably quick (before the timeout happens). To make the test results easier to understand we also need to adjust the batch sizes to 1 for topologies we are not directly testing. Set the bro parser batch size to 1: ``` curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "parserClassName": "org.apache.metron.parsers.bro.BasicBroParser", "sensorTopic": "bro", "parserConfig": { "batchSize": 1 } }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/bro' ``` 14. Set the snort parser batch size to 1: ``` curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "parserClassName": "org.apache.metron.parsers.snort.BasicSnortParser", "sensorTopic": "snort", "parserConfig": { "batchSize": 1 } }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/snort' ``` 15. Set the enrichment batch size to 1: ``` curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "es.clustername": "metron", "es.ip": "node1:9200", "es.date.format": "yyyy.MM.dd.HH", "parser.error.topic": "indexing", "update.hbase.table": "metron_update", "update.hbase.cf": "t", "es.client.settings": {}, "profiler.client.period.duration": "15", "profiler.client.period.duration.units": "MINUTES", "user.settings.hbase.table": "user_settings", "user.settings.hbase.cf": "cf", "bootstrap.servers": "node1:6667", "source.type.field": "source:type", "threat.triage.score.field": "threat:triage:score", "enrichment.writer.batchSize": "1", "profiler.writer.batchSize": "15", "profiler.writer.batchTimeout": "0", "geo.hdfs.file": "/apps/metron/geo/default/GeoLite2-City.tar.gz", "asn.hdfs.file": "/apps/metron/asn/default/GeoLite2-ASN.tar.gz" }' 'http://user:password@node1:8082/api/v1/global/config' ``` 16. Set the jsonMapQuery indexing batch size to 1 (since we will test this sensor in parsers): ``` curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "hdfs": { "index": "jsonmapquery", "batchSize": 1, "enabled": true }, "elasticsearch": { "index": "jsonmapquery", "batchSize": 1, "enabled": true }, "solr": { "index": "jsonmapquery", "batchSize": 1, "enabled": false } }' 'http://user:password@node1:8082/api/v1/sensor/indexing/config/jsonMapQuery' ``` It can be helpful to keep a Kafka console consumer running for each topic: ``` /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic bro /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic snort /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic jsonMapQuery /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic enrichments /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic indexing ``` #### bro 1. Bro has a batch size of 5 in indexing by default. Write 4 bro messages: ``` shuf -n 4 /opt/sensor-stubs/data/bro.out | sed -e "s/\"ts\"\:[0-9]\+\./\"ts\"\:`date +%s`\./g" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic bro ``` There should be no data in Elasticsearch or HDFS: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep bro [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json | wc -l cat: `/apps/metron/indexing/indexed/bro/*.json': No such file or directory 0 ``` 2. Write an additional bro message: ``` shuf -n 1 /opt/sensor-stubs/data/bro.out | sed -e "s/\"ts\"\:[0-9]\+\./\"ts\"\:`date +%s`\./g" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic bro ``` There should now be 5 messages in Elasticsearch and HDFS: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep bro bro_index_2019.02.07.22 5 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json | wc -l 5 ``` #### snort 1. Snort has a batch size of 1 in indexing by default: Write a snort message: ``` shuf -n 1 /opt/sensor-stubs/data/snort.out | sed -e "s/[^,]\+ ,/`date +'%m\/%d\/%y-%H:%M:%S'`.000000 ,/g" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic snort ``` There should now be 1 message in Elasticsearch and HDFS: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep snort snort_index_2019.02.07.22 1 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/snort/*.json | wc -l 1 ``` #### jsonMapQuery 1. The jsonMapQuery sensor produces 10 messages for every tuple. Set the batch size to 5 in the parser topology: ``` curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "parserClassName": "org.apache.metron.parsers.json.JSONMapParser", "sensorTopic": "jsonMapQuery", "parserConfig": { "jsonpQuery": "$.foo", "batchSize": 5 } }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/jsonMapQuery' ``` 2. Writer a jsonMapQuery message: ``` cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery ``` This should cause the issue described in the Jira. Before there was only a single message being written for each tuple. With this PR there should be 10 messages in Elasticsearch and HDFS: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery jsonmapquery_index_2019.02.07.22 10 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l 10 ``` 3. Change the batch size to 15: ``` curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "parserClassName": "org.apache.metron.parsers.json.JSONMapParser", "sensorTopic": "jsonMapQuery", "parserConfig": { "jsonpQuery": "$.foo", "batchSize": 15 } }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/jsonMapQuery' ``` 4. Writer another jsonMapQuery message: ``` cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery ``` There should still be 10 messages in Elasticsearch and HDFS (no additional messages written): ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery jsonmapquery_index_2019.02.07.22 10 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l 10 ``` 5. Writer another jsonMapQuery message: ``` cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery ``` This should cause a batch to flush so we should see 15 additional messages: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery jsonmapquery_index_2019.02.07.22 25 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l 25 ``` 6. Writer another jsonMapQuery message: ``` cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery ``` This should cause another batch to flush since there were 5 messages still in the batch. We should see another 15 messages added: ``` [root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery jsonmapquery_index_2019.02.07.22 40 [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l 40 ``` The various topologies can be tested by adjusting their batch sizes and timeouts. The previous instructions focus on testing batch sizes for bro in indexing, snort in indexing and jsonMapQuery in parsing. Other topologies and scenarios can and should be tested with different batch sizes and timeouts. ### Feedback Requested Outside of the standard code review and function testing, they are a few areas that are not 100% clear and I would like feedback on: - Is the fundamental architectural approach solid? Are there any holes I'm not thinking of? - The message ids generated by the bolts and used to track messages in the writer classes are Java UUIDs. Is this good enough? Any id that uniquely identifies a message could be used here. Is there something that would perform better? - There is the potential for messages to pile up in `StormBulkWriterResponseHandler` (and probably other `BulkWriterComponent` too) when tuples time out and are replayed. I think this is also an issue now but should we explore some kind of cache that can evict messages after timeouts? - I have added javadocs and am planning on taking another pass and adding more. Are there areas you feel need more/better explanation? My goal is for these classes to be easy to understand and navigate. Is the interaction between the bolts and response handler clear? ## Pull Request Checklist Thank you for submitting a contribution to Apache Metron. Please refer to our [Development Guidelines](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61332235) for the complete guide to follow for contributions. Please refer also to our [Build Verification Guidelines](https://cwiki.apache.org/confluence/display/METRON/Verifying+Builds?show-miniview) for complete smoke testing guides. In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? ### For code changes: - [x] Have you included steps to reproduce the behavior or problem that is being changed or addressed? - [x] Have you included steps or a guide to how the change may be verified and tested manually? - [x] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via: ``` mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh ``` - [x] Have you written or updated unit tests and or integration tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via `site-book/target/site/index.html`: ``` cd site-book mvn site ``` #### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. It is also recommended that [travis-ci](https://travis-ci.org) is set up for your personal repository such that your branches are built there before submitting a pull request.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
