merrimanr opened a new pull request #1330: METRON-1968: Messages are lost when 
a parser produces multiple messages and batch size is greater than 1
URL: https://github.com/apache/metron/pull/1330
 
 
   ## Contributor Comments
   This PR represents a fairly significant shift in the Writer class 
architecture.  Currently these classes do not support tuples that result in 
multiple messages, mainly due to a limitation in the `BulkMessageWriter` 
interface.  The `write` method accepts separate lists of tuples and messages so 
there is no way to know which tuples are associated with which message.  It has 
worked so far with parsers that only emit a single message from a tuple because 
a 1 to 1 relationship is assumed in these classes.
   
   I experimented with several different approaches to fixing this and tried to 
follow a strategy that would avoid having to significantly rewrite multiple 
classes (this was unavoidable in certain places).  I change the 
`BulkMessageWriter.write` interface from:
   ```
   BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws 
Exception;
   ```
   to:
   ```
   BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations, Map<String, MESSAGE_T> messages) throws Exception;
   ```
   The messages being passed in are now represented as a map where the keys are 
message ids and the values are the messages.  This made changes to the 
`BulkMessageWriter` implementations easier because they continue to assume a 1 
to 1 relationship between message ids (formerly tuples) and messages.  Now the 
writers report which message ids succeeded/failed instead of tuples.  The 
`BulkWriterComponent` continues track batch sizes and timeouts but no longer 
manages tuples or error handling.  Instead it calls 
`BulkWriterResponseHandler.handleFlush` when messages are flushed.  This 
interface is injected into `BulkWriterComponent` and allows us to properly 
commit messages according to the requirements of the platform the classes are 
running on.  For now a Storm implementation of `BulkWriterResponseHandler` is 
setup and passed into the `BulkWriterComponent`.   Managing tuple to message 
relationships as well as tuple acking and error handling are now done in a 
single class.  The bolts now have to setup a `StormBulkWriterResponseHandler` 
object and add tuples/messages to it.
   
   ### Changes Included
   
   - The `BulkMessageWriter` is updated and all implementations have been 
adjusted to conform to the new interfaces.  Changes to the implementations are 
small and straightforward.
   - Tuple acking and error handling is moved out of `BulkWriterComponent` and 
into `StormBulkWriterResponseHandler`. 
   - Bolts have been updated with the new `StormBulkWriterResponseHandler` 
pattern
   - I noticed `HBaseWriter` and `PcapWriter are no longer being used.  Rather 
than go to the trouble of updating their tests I removed them.
   - Removed `WriterHandler.handleAck` since acks are no longer handled here.
   - Added logging to make troubleshooting writer issues easier
   
   There were also several significant changes that needed to be made to the 
tests:
   
   - The writer tests are updated to match the new `BulkMessageWriter.write` 
interface.  In most cases these changes were simple however I noticed there 
were no unit tests for `KafkaWriter.write` so I added them.
   - The parser integration tests now verify the all tuples were acked.
   - There is now a parser integration test that simulates the use case 
described in this Jira.  The `jsonMapQuery` parser integration test now 
produces multiple messages from a single tuple and sets the batch size to 5.
   - A test was added for `StormBulkWriterResponseHandler` that also simulates 
the use case described in this Jira.  Error handling tests that were originally 
in `BulkWriterComponentTest` were migrated here.
   
   ### Testing
   
   This has been tested in full dev both for regression and for the use case 
described in this Jira.  There are 3 test cases:
   
   - bro sensor with a batch size set to 5
   - snort sensor with a batch size set to 1
   - jsonMapQuery sensor that produces multiple messages from a tuple with a 
batch size set to both less than and greater than the number of messages 
   
   #### Setup
   1. Spin up full dev and verify data is landing in both Elasticsearch and 
HDFS.   
   2. Stop all sensors with `service sensor-stubs stop`.
   3. Create the jsonMapQuery sensor topic:
   ```
   /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper node1:2181 
--create --topic jsonMapQuery --partitions 1 --replication-factor 1
   ```
   4. Copy the jsonMapQuery sample data to full dev:
   ```
   cd /metron-deployment/development/centos6
   vagrant scp 
../../../metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput
 /tmp
   ```
   5. Stop Metron Parsers in Ambari.
   6. Navigate to Ambari > Metron > Configs > Parsers and change "Metron 
Parsers" from `bro,snort,yaf` to `bro,snort,jsonMapQuery`.
   7. Start Metron Parsers in Ambari.
   8. Clear the Elasticsearch indices:
   ```
   curl -XDELETE http://node1:9200/bro_index*
   curl -XDELETE http://node1:9200/snort_index*
   ```
   9. Stop Metron Indexing in Ambari. 
   10. Clear the HDFS files:
   ```
   su hdfs -c "hdfs dfs -rm /apps/metron/indexing/indexed/bro/*"
   su hdfs -c "hdfs dfs -rm /apps/metron/indexing/indexed/snort/*"
   ```
   11. Start Metron Indexing in Ambari.
   
   12. Verify there is no bro or snort data in Elasticsearch or HDFS:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep bro
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep snort
   [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json 
| wc -l
   cat: `/apps/metron/indexing/indexed/bro/*.json': No such file or directory
   0
   [root@node1 vagrant]# hdfs dfs -cat 
/apps/metron/indexing/indexed/snort/*.json | wc -l
   cat: `/apps/metron/indexing/indexed/snort/*.json': No such file or directory
   0
   ```
   13. Each topology (parsing, enrichment, indexing) can be configured with a 
batch size and timeout.  The batch timeout defaults to 1/2 the tuple timeout so 
tests described next should be done reasonably quick (before the timeout 
happens).  To make the test results easier to understand we also need to adjust 
the batch sizes to 1 for topologies we are not directly testing.  Set the bro 
parser batch size to 1:
   ```
   curl -X POST --header 'Content-Type: application/json' --header 'Accept: 
application/json' -d '{
     "parserClassName": "org.apache.metron.parsers.bro.BasicBroParser",
     "sensorTopic": "bro",
     "parserConfig": {
       "batchSize": 1
     }
   }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/bro'
   ```
   14. Set the snort parser batch size to 1:
   ```
   curl -X POST --header 'Content-Type: application/json' --header 'Accept: 
application/json' -d '{
     "parserClassName": "org.apache.metron.parsers.snort.BasicSnortParser",
     "sensorTopic": "snort",
     "parserConfig": {
       "batchSize": 1
     }
   }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/snort'
   ```
   15. Set the enrichment batch size to 1:
   ```
   curl -X POST --header 'Content-Type: application/json' --header 'Accept: 
application/json' -d '{
     "es.clustername": "metron",
     "es.ip": "node1:9200",
     "es.date.format": "yyyy.MM.dd.HH",
     "parser.error.topic": "indexing",
     "update.hbase.table": "metron_update",
     "update.hbase.cf": "t",
     "es.client.settings": {},
     "profiler.client.period.duration": "15",
     "profiler.client.period.duration.units": "MINUTES",
     "user.settings.hbase.table": "user_settings",
     "user.settings.hbase.cf": "cf",
     "bootstrap.servers": "node1:6667",
     "source.type.field": "source:type",
     "threat.triage.score.field": "threat:triage:score",
     "enrichment.writer.batchSize": "1",
     "profiler.writer.batchSize": "15",
     "profiler.writer.batchTimeout": "0",
     "geo.hdfs.file": "/apps/metron/geo/default/GeoLite2-City.tar.gz",
     "asn.hdfs.file": "/apps/metron/asn/default/GeoLite2-ASN.tar.gz"
   }' 'http://user:password@node1:8082/api/v1/global/config'
   ```
   16. Set the jsonMapQuery indexing batch size to 1 (since we will test this 
sensor in parsers):
   ```
   curl -X POST --header 'Content-Type: application/json' --header 'Accept: 
application/json' -d '{
     "hdfs": {
       "index": "jsonmapquery",
       "batchSize": 1,
       "enabled": true
     },
     "elasticsearch": {
       "index": "jsonmapquery",
       "batchSize": 1,
       "enabled": true
     },
     "solr": {
       "index": "jsonmapquery",
       "batchSize": 1,
       "enabled": false
     }
   }' 
'http://user:password@node1:8082/api/v1/sensor/indexing/config/jsonMapQuery'
   ```
   It can be helpful to keep a Kafka console consumer running for each topic:
   ```
   /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper 
node1:2181 --topic bro
   /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper 
node1:2181 --topic snort
   /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper 
node1:2181 --topic jsonMapQuery
   /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper 
node1:2181 --topic enrichments
   /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper 
node1:2181 --topic indexing
   ```
   #### bro
   1. Bro has a batch size of 5 in indexing by default.  Write 4 bro messages:
   ```
   shuf -n 4 /opt/sensor-stubs/data/bro.out | sed -e 
"s/\"ts\"\:[0-9]\+\./\"ts\"\:`date +%s`\./g" | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic bro
   ```
   There should be no data in Elasticsearch or HDFS:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep bro
   [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json 
| wc -l
   cat: `/apps/metron/indexing/indexed/bro/*.json': No such file or directory
   0
   ```
   2. Write an additional bro message:
   ```
   shuf -n 1 /opt/sensor-stubs/data/bro.out | sed -e 
"s/\"ts\"\:[0-9]\+\./\"ts\"\:`date +%s`\./g" | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic bro
   ```
   There should now be 5 messages in Elasticsearch and HDFS:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep bro
   bro_index_2019.02.07.22    5
   [root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json 
| wc -l
   5
   ```
   
   #### snort
   1. Snort has a batch size of 1 in indexing by default:  Write a snort 
message:
   ```
   shuf -n 1 /opt/sensor-stubs/data/snort.out | sed -e "s/[^,]\+ ,/`date 
+'%m\/%d\/%y-%H:%M:%S'`.000000 ,/g" | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic snort
   ```
   There should now be 1 message in Elasticsearch and HDFS:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep snort
   snort_index_2019.02.07.22    1
   [root@node1 vagrant]# hdfs dfs -cat 
/apps/metron/indexing/indexed/snort/*.json | wc -l
   1
   ```
   #### jsonMapQuery
   1. The jsonMapQuery sensor produces 10 messages for every tuple.  Set the 
batch size to 5 in the parser topology:
   ```
   curl -X POST --header 'Content-Type: application/json' --header 'Accept: 
application/json' -d '{
     "parserClassName": "org.apache.metron.parsers.json.JSONMapParser",
     "sensorTopic": "jsonMapQuery",
     "parserConfig": {
       "jsonpQuery": "$.foo",
       "batchSize": 5
     }
   }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/jsonMapQuery'
   ```
   2. Writer a jsonMapQuery message:
   ```
   cat /tmp/jsonMapExampleOutput | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic jsonMapQuery
   ```
   This should cause the issue described in the Jira.  Before there was only a 
single message being written for each tuple.  With this PR there should be 10 
messages in Elasticsearch and HDFS:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
   jsonmapquery_index_2019.02.07.22   10
   [root@node1 vagrant]# hdfs dfs -cat 
/apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
   10
   ```
   3. Change the batch size to 15:
   ```
   curl -X POST --header 'Content-Type: application/json' --header 'Accept: 
application/json' -d '{
     "parserClassName": "org.apache.metron.parsers.json.JSONMapParser",
     "sensorTopic": "jsonMapQuery",
     "parserConfig": {
       "jsonpQuery": "$.foo",
       "batchSize": 15
     }
   }' 'http://user:password@node1:8082/api/v1/sensor/parser/config/jsonMapQuery'
   ```
   4. Writer another jsonMapQuery message:
   ```
   cat /tmp/jsonMapExampleOutput | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic jsonMapQuery
   ```
   There should still be 10 messages in Elasticsearch and HDFS (no additional 
messages written):
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
   jsonmapquery_index_2019.02.07.22   10
   [root@node1 vagrant]# hdfs dfs -cat 
/apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
   10
   ```
   5. Writer another jsonMapQuery message:
   ```
   cat /tmp/jsonMapExampleOutput | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic jsonMapQuery
   ```
   This should cause a batch to flush so we should see 15 additional messages:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
   jsonmapquery_index_2019.02.07.22   25
   [root@node1 vagrant]# hdfs dfs -cat 
/apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
   25
   ```
   6. Writer another jsonMapQuery message:
   ```
   cat /tmp/jsonMapExampleOutput | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
node1:6667 --topic jsonMapQuery
   ```
   This should cause another batch to flush since there were 5 messages still 
in the batch.  We should see another 15 messages added:
   ```
   [root@node1 vagrant]# curl -s -XGET 
http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
   jsonmapquery_index_2019.02.07.22   40
   [root@node1 vagrant]# hdfs dfs -cat 
/apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
   40
   ```
   
   The various topologies can be tested by adjusting their batch sizes and 
timeouts.  The previous instructions focus on testing batch sizes for bro in 
indexing, snort in indexing and jsonMapQuery in parsing.  Other topologies and 
scenarios can and should be tested with different batch sizes and timeouts.
   
   ### Feedback Requested
   
   Outside of the standard code review and function testing, they are a few 
areas that are not 100% clear and I would like feedback on:
   
   - Is the fundamental architectural approach solid?  Are there any holes I'm 
not thinking of?
   - The message ids generated by the bolts and used to track messages in the 
writer classes are Java UUIDs.  Is this good enough?  Any id that uniquely 
identifies a message could be used here.  Is there something that would perform 
better? 
   - There is the potential for messages to pile up in 
`StormBulkWriterResponseHandler` (and probably other `BulkWriterComponent` too) 
when tuples time out and are replayed.  I think this is also an issue now but 
should we explore some kind of cache that can evict messages after timeouts?
   - I have added javadocs and am planning on taking another pass and adding 
more.  Are there areas you feel need more/better explanation?  My goal is for 
these classes to be easy to understand and navigate.  Is the interaction 
between the bolts and response handler clear?
   
   ## Pull Request Checklist
   
   Thank you for submitting a contribution to Apache Metron.  
   Please refer to our [Development 
Guidelines](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61332235)
 for the complete guide to follow for contributions.  
   Please refer also to our [Build Verification 
Guidelines](https://cwiki.apache.org/confluence/display/METRON/Verifying+Builds?show-miniview)
 for complete smoke testing guides.  
   
   
   In order to streamline the review of the contribution we ask you follow 
these guidelines and ask you to double check the following:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? If not one needs to be 
created at [Metron 
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
   - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   - [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
   
   
   ### For code changes:
   - [x] Have you included steps to reproduce the behavior or problem that is 
being changed or addressed?
   - [x] Have you included steps or a guide to how the change may be verified 
and tested manually?
   - [x] Have you ensured that the full suite of tests and checks have been 
executed in the root metron folder via:
     ```
     mvn -q clean integration-test install && 
dev-utilities/build-utils/verify_licenses.sh 
     ```
   
   - [x] Have you written or updated unit tests and or integration tests to 
verify your changes?
   - [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [x] Have you verified the basic functionality of the build by building and 
running locally with Vagrant full-dev environment or the equivalent?
   
   ### For documentation related changes:
   - [x] Have you ensured that format looks appropriate for the output in which 
it is rendered by building and verifying the site-book? If not then run the 
following commands and the verify changes via 
`site-book/target/site/index.html`:
   
     ```
     cd site-book
     mvn site
     ```
   
   #### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.
   It is also recommended that [travis-ci](https://travis-ci.org) is set up for 
your personal repository such that your branches are built there before 
submitting a pull request.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to