GitHub user nickwallen opened a pull request:

    https://github.com/apache/metron/pull/1097

    Kafka seek

    A Stellar function to be used in the REPL that allows you to seek to a 
specific offset within a Kafka topic.  This complements the existing Kafka 
functions like `KAFKA_GET`, `KAFKA_TAIL`, and `KAFKA_FIND`.  Like those 
functions this supports the "rich" message view.
    
    ## Testing
    
    1. Launch a development environment and run the REPL.
    
        ```
        [root@node1 ~]# source /etc/default/metron
        [root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
        Stellar, Go!
        ...
        ```
    
    1. Turn on the "rich" view so that we can compare offsets.
    
        ```
        [Stellar]>>> %define stellar.kafka.message.view := "rich"
        rich
        ```
    
    1. Take a look at the function docs.
    
        ```
        [Stellar]>>> ?KAFKA_SEEK
        KAFKA_SEEK
        Description: Seeks to an offset within a topic and returns the message.
    
        Arguments:
                topic - The name of the Kafka topic
                partition - The partition identifier; starts at 0.
                offset - The offset within the partition; starts at 0.
                config - Optional map of key/values that override any global 
properties.
    
        Returns: The message at the given offset, if the offset exists. 
Otherwise, returns null.
        ```
    
    1. Get a message using `KAFKA_GET`.    
    
        ```
        [Stellar]>>> KAFKA_GET("indexing")
        [{partition=0, offset=22022, topic=indexing, 
value={"adapter.threatinteladapter.end.ts":"1530982362426","bro_timestamp":"1530982357.858518","status_code":200,"ip_dst_port":80,"enrichmentsplitterbolt.splitter.end.ts":"1530982362416","enrichments.geo.ip_dst_addr.city":"Strasbourg","enrichments.geo.ip_dst_addr.latitude":"48.5839","enrichmentsplitterbolt.splitter.begin.ts":"1530982362415","adapter.hostfromjsonlistadapter.end.ts":"1530982362418","enrichments.geo.ip_dst_addr.country":"FR","enrichments.geo.ip_dst_addr.locID":"2973783","adapter.geoadapter.begin.ts":"1530982362418","enrichments.geo.ip_dst_addr.postalCode":"67100","uid":"CKF1qF1rGnRihXjk84","resp_mime_types":["application\/x-dosexec"],"trans_depth":1,"protocol":"http","original_string":"HTTP
 | id.orig_p:49189 status_code:200 method:GET request_body_len:0 id.resp_p:80 
uri:\/?b514ee6f0fe486009a6d83b035a4c0bd tags:[] uid:CKF1qF1rGnRihXjk84 
resp_mime_types:[\"application\\\/x-dosexec\"] trans_depth:1 host:62.75.195.236 
sta
 tus_msg:OK id.orig_h:192.168.138.158 response_body_len:221184 
user_agent:Mozilla\/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; 
Trident\/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 
3.0.30729; Media Center PC 6.0) ts:1530982357.858518 id.resp_h:62.75.195.236 
resp_fuids:[\"FuXzMl1eXRxp2UBgvk\"]","ip_dst_addr":"62.75.195.236","threatinteljoinbolt.joiner.ts":"1530982362429","host":"62.75.195.236","enrichmentjoinbolt.joiner.ts":"1530982362421","adapter.hostfromjsonlistadapter.begin.ts":"1530982362418","threatintelsplitterbolt.splitter.begin.ts":"1530982362423","enrichments.geo.ip_dst_addr.longitude":"7.7455","ip_src_addr":"192.168.138.158","user_agent":"Mozilla\/4.0
 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident\/4.0; SLCC2; .NET CLR 
2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 
6.0)","resp_fuids":["FuXzMl1eXRxp2UBgvk"],"timestamp":1530982357858,"method":"GET","request_body_len":0,"uri":"\/?b514ee6f0fe486009a6d83b035a4c0bd","tags":[],"sour
 
ce.type":"bro","adapter.geoadapter.end.ts":"1530982362418","threatintelsplitterbolt.splitter.end.ts":"1530982362423","adapter.threatinteladapter.begin.ts":"1530982362426","ip_src_port":49189,"enrichments.geo.ip_dst_addr.location_point":"48.5839,7.7455","status_msg":"OK","guid":"bd3d53ff-2c6d-4b43-9f82-9846f9814f63","response_body_len":221184},
 key=null, timestamp=1530982362434}]
        ```
    
    1. Using the topic, partition, and offset returned by `KAFKA_GET` retrieve 
the same message using `KAFKA_SEEK`.  Ensure that the value returned is the 
same.
    
        ```
        [Stellar]>>> KAFKA_SEEK("indexing", 0, 22022)
        {partition=0, offset=22022, topic=indexing, 
value={"adapter.threatinteladapter.end.ts":"1530982362426","bro_timestamp":"1530982357.858518","status_code":200,"ip_dst_port":80,"enrichmentsplitterbolt.splitter.end.ts":"1530982362416","enrichments.geo.ip_dst_addr.city":"Strasbourg","enrichments.geo.ip_dst_addr.latitude":"48.5839","enrichmentsplitterbolt.splitter.begin.ts":"1530982362415","adapter.hostfromjsonlistadapter.end.ts":"1530982362418","enrichments.geo.ip_dst_addr.country":"FR","enrichments.geo.ip_dst_addr.locID":"2973783","adapter.geoadapter.begin.ts":"1530982362418","enrichments.geo.ip_dst_addr.postalCode":"67100","uid":"CKF1qF1rGnRihXjk84","resp_mime_types":["application\/x-dosexec"],"trans_depth":1,"protocol":"http","original_string":"HTTP
 | id.orig_p:49189 status_code:200 method:GET request_body_len:0 id.resp_p:80 
uri:\/?b514ee6f0fe486009a6d83b035a4c0bd tags:[] uid:CKF1qF1rGnRihXjk84 
resp_mime_types:[\"application\\\/x-dosexec\"] trans_depth:1 host:62.75.195.236 
stat
 us_msg:OK id.orig_h:192.168.138.158 response_body_len:221184 
user_agent:Mozilla\/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; 
Trident\/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 
3.0.30729; Media Center PC 6.0) ts:1530982357.858518 id.resp_h:62.75.195.236 
resp_fuids:[\"FuXzMl1eXRxp2UBgvk\"]","ip_dst_addr":"62.75.195.236","threatinteljoinbolt.joiner.ts":"1530982362429","host":"62.75.195.236","enrichmentjoinbolt.joiner.ts":"1530982362421","adapter.hostfromjsonlistadapter.begin.ts":"1530982362418","threatintelsplitterbolt.splitter.begin.ts":"1530982362423","enrichments.geo.ip_dst_addr.longitude":"7.7455","ip_src_addr":"192.168.138.158","user_agent":"Mozilla\/4.0
 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident\/4.0; SLCC2; .NET CLR 
2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 
6.0)","resp_fuids":["FuXzMl1eXRxp2UBgvk"],"timestamp":1530982357858,"method":"GET","request_body_len":0,"uri":"\/?b514ee6f0fe486009a6d83b035a4c0bd","tags":[],"sourc
 
e.type":"bro","adapter.geoadapter.end.ts":"1530982362418","threatintelsplitterbolt.splitter.end.ts":"1530982362423","adapter.threatinteladapter.begin.ts":"1530982362426","ip_src_port":49189,"enrichments.geo.ip_dst_addr.location_point":"48.5839,7.7455","status_msg":"OK","guid":"bd3d53ff-2c6d-4b43-9f82-9846f9814f63","response_body_len":221184},
 key=null, timestamp=1530982362434}
        ```
    
    1. Grab the first message.
    
        ```
        [Stellar]>>> KAFKA_SEEK("indexing", 0, 0)
        {partition=0, offset=0, topic=indexing, 
value={"adapter.threatinteladapter.end.ts":"1530978697769","qclass_name":"qclass-32769","bro_timestamp":"1530978687.836793","qtype_name":"PTR","ip_dst_port":5353,"enrichmentsplitterbolt.splitter.end.ts":"1530978696551","qtype":12,"rejected":false,"enrichmentsplitterbolt.splitter.begin.ts":"1530978696550","adapter.hostfromjsonlistadapter.end.ts":"1530978696606","trans_id":0,"adapter.geoadapter.begin.ts":"1530978696857","uid":"CGs8rS1rqhyXRRgA64","protocol":"dns","original_string":"DNS
 | AA:false qclass_name:qclass-32769 id.orig_p:5353 qtype_name:PTR qtype:12 
rejected:false id.resp_p:5353 query:_googlecast._tcp.local trans_id:0 TC:false 
RA:false uid:CGs8rS1rqhyXRRgA64 RD:false proto:udp id.orig_h:192.168.66.1 Z:0 
qclass:32769 ts:1530978687.836793 
id.resp_h:224.0.0.251","ip_dst_addr":"224.0.0.251","threatinteljoinbolt.joiner.ts":"1530978697808","enrichmentjoinbolt.joiner.ts":"1530978696932","adapter.hostfromjsonlistadapter.begin.ts":"15309
 
78696606","threatintelsplitterbolt.splitter.begin.ts":"1530978696949","Z":0,"ip_src_addr":"192.168.66.1","qclass":32769,"timestamp":1530978687836,"AA":false,"query":"_googlecast._tcp.local","TC":false,"RA":false,"source.type":"bro","adapter.geoadapter.end.ts":"1530978696857","RD":false,"threatintelsplitterbolt.splitter.end.ts":"1530978696952","adapter.threatinteladapter.begin.ts":"1530978697764","ip_src_port":5353,"proto":"udp","guid":"90751ce5-703d-4b9f-8c2d-8e5c42e72262"},
 key=null, timestamp=1530978697856}
        ```
    
    ## Pull Request Checklist
    
    - [ ] Is there a JIRA ticket associated with this PR? If not one needs to 
be created at [Metron 
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [ ] Does your PR title start with METRON-XXXX where XXXX is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
    - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
    - [ ] Have you included steps to reproduce the behavior or problem that is 
being changed or addressed?
    - [ ] Have you included steps or a guide to how the change may be verified 
and tested manually?
    - [ ] Have you ensured that the full suite of tests and checks have been 
executed in the root metron folder via:
    - [ ] Have you written or updated unit tests and or integration tests to 
verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] Have you verified the basic functionality of the build by building 
and running locally with Vagrant full-dev environment or the equivalent?


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nickwallen/metron KAFKA_SEEK

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/1097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1097
    
----
commit 9361f05fa2644275d89431d46974a2412dc76417
Author: Nick Allen <nick@...>
Date:   2018-07-06T19:37:54Z

    Implemented KAFKA_SEEK function

commit b153441cdc2418770cfb2fd2d51e0bba31f1c5d1
Author: Nick Allen <nick@...>
Date:   2018-07-07T17:07:00Z

    Improved function docs

----


---

Reply via email to