Github user cestella commented on the issue: https://github.com/apache/incubator-metron/pull/450 Testing Instructions beyond the normal smoke test (i.e. letting data flow through to the indices and checking them). ## Free Up Space on the virtual machine First, let's free up some headroom on the virtual machine. If you are running this on a multinode cluster, you would not have to do this. * Kill monit via `service monit stop` * Kill tcpreplay via `for i in $(ps -ef | grep tcpreplay | awk '{print $2}');do kill -9 $i;done` * Kill existing parser topologies via * `storm kill snort` * `storm kill bro` * Kill yaf via `for i in $(ps -ef | grep yaf | awk '{print $2}');do kill -9 $i;done` * Kill bro via `for i in $(ps -ef | grep bro | awk '{print $2}');do kill -9 $i;done` ## Preliminaries * Set an environment variable to indicate `METRON_HOME`: `export METRON_HOME=/usr/metron/0.3.1` * Create the profiler hbase table `echo "create 'profiler', 'P'" | hbase shell` * Open `~/rand_gen.py` and paste the following: ``` #!/usr/bin/python import random import sys import time def main(): mu = float(sys.argv[1]) sigma = float(sys.argv[2]) freq_s = int(sys.argv[3]) while True: out = '{ "value" : ' + str(random.gauss(mu, sigma)) + ' }' print out sys.stdout.flush() time.sleep(freq_s) if __name__ == '__main__': main() ``` This will generate random JSON maps with a numeric field called `value` * Set the profiler to use 1 minute tick durations: * Edit `$METRON_HOME/config/profiler.properties` to adjust the capture duration by changing `profiler.period.duration=15` to `profiler.period.duration=1` * Edit `$METRON_HOME/config/zookeeper/global.json` and add the following properties: ``` "profiler.client.period.duration" : "1", "profiler.client.period.duration.units" : "MINUTES" ``` ## Deploy the custom parser * Edit the value parser config at `$METRON_HOME/config/zookeeper/parsers/value.json`: ``` { "parserClassName":"org.apache.metron.parsers.json.JSONMapParser", "sensorTopic":"value", "fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "num_profiles_parser", "mean_parser" ] ,"config" : { "num_profiles_parser" : "LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us')))", "mean_parser" : "STATS_MEAN(STATS_MERGE(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us'))))" } } ] } ``` * Edit the value enrichment config at `$METRON_HOME/config/zookeeper/enrichments/value.json`: ``` { "enrichment" : { "fieldMap": { "stellar" : { "config" : { "num_profiles_enrichment" : "LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us')))", "mean_enrichment" : "STATS_MEAN(STATS_MERGE(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us'))))" } } } } } ``` * Create the value kafka topic: `/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper node1:2181 --create --topic value --partitions 1 --replication-factor 1` * Push the configs via `$METRON_HOME/bin/zk_load_configs.sh -m PUSH -i $METRON_HOME/config/zookeeper -z node1:2181` * Start via `$METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s value` ## Start the profiler * Edit `$METRON_HOME/config/zookeeper/profiler.json` and paste in the following: ``` { "profiles": [ { "profile": "stat", "foreach": "'global'", "onlyif": "true", "init" : { }, "update": { "s": "STATS_ADD(s, value)" }, "result": "s" } ] } ``` * `$METRON_HOME/bin/start_profiler_topology.sh` ## Test Case * Set up a profile to accept some synthetic data with a numeric `value` field and persist a stats summary of the data * Send some synthetic data directly to the profiler: `python ~/rand_gen.py 0 1 1 | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic value` * Wait for at least 32 minutes and execute the following via the Stellar REPL: ``` # Grab the profiles from 1 minute ago to 8 minutes ago LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('from 1 minute ago to 8 minutes ago'))) # Looks like 7 were returned, great. Now try something more complex # Grab the profiles in 5 minute windows every 10 minutes from 2 minutes ago to 32 minutes ago: # 32 minutes ago til 27 minutes ago should be 5 profiles # 22 minutes ago til 17 minutes ago should be 5 profiles # 12 minutes ago til 7 minutes ago should be 5 profiles # for a total of 15 profiles LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us'))) ``` For me, the following was the result: ``` Stellar, Go! Please note that functions are loading lazily in the background and will be unavailable until loaded fully. {es.clustername=metron, es.ip=node1, es.port=9300, es.date.format=yyyy.MM.dd.HH, profiler.client.period.duration=1, profiler.client.period.duration.units=MINUTES} [Stellar]>>> # Grab the profiles from 1 minute ago to 8 minutes ago [Stellar]>>> LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('from 1 minute ago to 8 minutes ago'))) Functions loaded, you may refer to functions now... 7 [Stellar]>>> # Looks like 7 were returned, great. [Stellar]>>> # Grab the profiles from 2 minutes ago to 32 minutes ago [Stellar]>>> LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('from 2 minutes ago to 32 minutes ago'))) 30 [Stellar]>>> # Looks like 30 were returned, great. [Stellar]>>> # Now try something more complex [Stellar]>>> # Grab the profiles in 5 minute windows every 10 minutes from 2 minutes ago to 32 minutes ago: [Stellar]>>> # 32 minutes ago til 27 minutes ago should be 5 profiles [Stellar]>>> # 22 minutes ago til 17 minutes ago should be 5 profiles [Stellar]>>> # 12 minutes ago til 7 minutes ago should be 5 profiles [Stellar]>>> # for a total of 15 profiles [Stellar]>>> LENGTH(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us'))) 15 [Stellar]>>> STATS_MEAN(STATS_MERGE(PROFILE_GET('stat', 'global', PROFILE_WINDOW('5 minute window every 10 minutes starting from 2 minutes ago until 32 minutes ago excluding holidays:us')))) 0.028019658637877063 [Stellar]>>> ``` * Delete any value index that currently exists (if any do) via `curl -XDELETE "http://localhost:9200/value*"` * Wait for a couple of seconds and run * `curl "http://localhost:9200/value*/_search?pretty=true&q=*:*" 2> /dev/null` * You should see values in the index with non-zero fields: * `num_profiles_enrichment` should be 15 * `num_profiles_parser` should be 15 * `mean_enrichment` should be a non-zero double * `mean_parser` should be a non-zero double For reference, a sample message for me is: ``` "_index" : "value_index_2017.02.17.18", "_type" : "value_doc", "_id" : "AVpNPI8JQV00TRR_I4zn", "_score" : 1.0, "_source" : { "adapter:stellaradapter:end:ts" : "1487354498620", "threatinteljoinbolt:joiner:ts" : "1487354498628", "enrichmentsplitterbolt:splitter:end:ts" : "1487354498576", "num_profiles_parser" : 15, "enrichmentsplitterbolt:splitter:begin:ts" : "1487354498571", "enrichmentjoinbolt:joiner:ts" : "1487354498622", "mean_enrichment" : 0.025770908095283665, "adapter:stellaradapter:begin:ts" : "1487354498578", "source:type" : "value", "original_string" : "{ \"value\" : -0.274471660322 }", "threatintelsplitterbolt:splitter:begin:ts" : "1487354498625", "num_profiles_enrichment" : 15, "threatintelsplitterbolt:splitter:end:ts" : "1487354498625", "value" : -0.274471660322, "mean_parser" : 0.025770908095283665, "timestamp" : 1487354498529 } ``` Here we've validated that the new window function can be called from the relevant topologies as well as the REPL and give consistent results that make sense.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---