GitHub user nickwallen opened a pull request:
https://github.com/apache/metron/pull/1097
Kafka seek
A Stellar function to be used in the REPL that allows you to seek to a
specific offset within a Kafka topic. This complements the existing Kafka
functions like `KAFKA_GET`, `KAFKA_TAIL`, and `KAFKA_FIND`. Like those
functions this supports the "rich" message view.
## Testing
1. Launch a development environment and run the REPL.
```
[root@node1 ~]# source /etc/default/metron
[root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
Stellar, Go!
...
```
1. Turn on the "rich" view so that we can compare offsets.
```
[Stellar]>>> %define stellar.kafka.message.view := "rich"
rich
```
1. Take a look at the function docs.
```
[Stellar]>>> ?KAFKA_SEEK
KAFKA_SEEK
Description: Seeks to an offset within a topic and returns the message.
Arguments:
topic - The name of the Kafka topic
partition - The partition identifier; starts at 0.
offset - The offset within the partition; starts at 0.
config - Optional map of key/values that override any global
properties.
Returns: The message at the given offset, if the offset exists.
Otherwise, returns null.
```
1. Get a message using `KAFKA_GET`.
```
[Stellar]>>> KAFKA_GET("indexing")
[{partition=0, offset=22022, topic=indexing,
value={"adapter.threatinteladapter.end.ts":"1530982362426","bro_timestamp":"1530982357.858518","status_code":200,"ip_dst_port":80,"enrichmentsplitterbolt.splitter.end.ts":"1530982362416","enrichments.geo.ip_dst_addr.city":"Strasbourg","enrichments.geo.ip_dst_addr.latitude":"48.5839","enrichmentsplitterbolt.splitter.begin.ts":"1530982362415","adapter.hostfromjsonlistadapter.end.ts":"1530982362418","enrichments.geo.ip_dst_addr.country":"FR","enrichments.geo.ip_dst_addr.locID":"2973783","adapter.geoadapter.begin.ts":"1530982362418","enrichments.geo.ip_dst_addr.postalCode":"67100","uid":"CKF1qF1rGnRihXjk84","resp_mime_types":["application\/x-dosexec"],"trans_depth":1,"protocol":"http","original_string":"HTTP
| id.orig_p:49189 status_code:200 method:GET request_body_len:0 id.resp_p:80
uri:\/?b514ee6f0fe486009a6d83b035a4c0bd tags:[] uid:CKF1qF1rGnRihXjk84
resp_mime_types:[\"application\\\/x-dosexec\"] trans_depth:1 host:62.75.195.236
sta
tus_msg:OK id.orig_h:192.168.138.158 response_body_len:221184
user_agent:Mozilla\/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64;
Trident\/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR
3.0.30729; Media Center PC 6.0) ts:1530982357.858518 id.resp_h:62.75.195.236
resp_fuids:[\"FuXzMl1eXRxp2UBgvk\"]","ip_dst_addr":"62.75.195.236","threatinteljoinbolt.joiner.ts":"1530982362429","host":"62.75.195.236","enrichmentjoinbolt.joiner.ts":"1530982362421","adapter.hostfromjsonlistadapter.begin.ts":"1530982362418","threatintelsplitterbolt.splitter.begin.ts":"1530982362423","enrichments.geo.ip_dst_addr.longitude":"7.7455","ip_src_addr":"192.168.138.158","user_agent":"Mozilla\/4.0
(compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident\/4.0; SLCC2; .NET CLR
2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC
6.0)","resp_fuids":["FuXzMl1eXRxp2UBgvk"],"timestamp":1530982357858,"method":"GET","request_body_len":0,"uri":"\/?b514ee6f0fe486009a6d83b035a4c0bd","tags":[],"sour
ce.type":"bro","adapter.geoadapter.end.ts":"1530982362418","threatintelsplitterbolt.splitter.end.ts":"1530982362423","adapter.threatinteladapter.begin.ts":"1530982362426","ip_src_port":49189,"enrichments.geo.ip_dst_addr.location_point":"48.5839,7.7455","status_msg":"OK","guid":"bd3d53ff-2c6d-4b43-9f82-9846f9814f63","response_body_len":221184},
key=null, timestamp=1530982362434}]
```
1. Using the topic, partition, and offset returned by `KAFKA_GET` retrieve
the same message using `KAFKA_SEEK`. Ensure that the value returned is the
same.
```
[Stellar]>>> KAFKA_SEEK("indexing", 0, 22022)
{partition=0, offset=22022, topic=indexing,
value={"adapter.threatinteladapter.end.ts":"1530982362426","bro_timestamp":"1530982357.858518","status_code":200,"ip_dst_port":80,"enrichmentsplitterbolt.splitter.end.ts":"1530982362416","enrichments.geo.ip_dst_addr.city":"Strasbourg","enrichments.geo.ip_dst_addr.latitude":"48.5839","enrichmentsplitterbolt.splitter.begin.ts":"1530982362415","adapter.hostfromjsonlistadapter.end.ts":"1530982362418","enrichments.geo.ip_dst_addr.country":"FR","enrichments.geo.ip_dst_addr.locID":"2973783","adapter.geoadapter.begin.ts":"1530982362418","enrichments.geo.ip_dst_addr.postalCode":"67100","uid":"CKF1qF1rGnRihXjk84","resp_mime_types":["application\/x-dosexec"],"trans_depth":1,"protocol":"http","original_string":"HTTP
| id.orig_p:49189 status_code:200 method:GET request_body_len:0 id.resp_p:80
uri:\/?b514ee6f0fe486009a6d83b035a4c0bd tags:[] uid:CKF1qF1rGnRihXjk84
resp_mime_types:[\"application\\\/x-dosexec\"] trans_depth:1 host:62.75.195.236
stat
us_msg:OK id.orig_h:192.168.138.158 response_body_len:221184
user_agent:Mozilla\/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64;
Trident\/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR
3.0.30729; Media Center PC 6.0) ts:1530982357.858518 id.resp_h:62.75.195.236
resp_fuids:[\"FuXzMl1eXRxp2UBgvk\"]","ip_dst_addr":"62.75.195.236","threatinteljoinbolt.joiner.ts":"1530982362429","host":"62.75.195.236","enrichmentjoinbolt.joiner.ts":"1530982362421","adapter.hostfromjsonlistadapter.begin.ts":"1530982362418","threatintelsplitterbolt.splitter.begin.ts":"1530982362423","enrichments.geo.ip_dst_addr.longitude":"7.7455","ip_src_addr":"192.168.138.158","user_agent":"Mozilla\/4.0
(compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident\/4.0; SLCC2; .NET CLR
2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC
6.0)","resp_fuids":["FuXzMl1eXRxp2UBgvk"],"timestamp":1530982357858,"method":"GET","request_body_len":0,"uri":"\/?b514ee6f0fe486009a6d83b035a4c0bd","tags":[],"sourc
e.type":"bro","adapter.geoadapter.end.ts":"1530982362418","threatintelsplitterbolt.splitter.end.ts":"1530982362423","adapter.threatinteladapter.begin.ts":"1530982362426","ip_src_port":49189,"enrichments.geo.ip_dst_addr.location_point":"48.5839,7.7455","status_msg":"OK","guid":"bd3d53ff-2c6d-4b43-9f82-9846f9814f63","response_body_len":221184},
key=null, timestamp=1530982362434}
```
1. Grab the first message.
```
[Stellar]>>> KAFKA_SEEK("indexing", 0, 0)
{partition=0, offset=0, topic=indexing,
value={"adapter.threatinteladapter.end.ts":"1530978697769","qclass_name":"qclass-32769","bro_timestamp":"1530978687.836793","qtype_name":"PTR","ip_dst_port":5353,"enrichmentsplitterbolt.splitter.end.ts":"1530978696551","qtype":12,"rejected":false,"enrichmentsplitterbolt.splitter.begin.ts":"1530978696550","adapter.hostfromjsonlistadapter.end.ts":"1530978696606","trans_id":0,"adapter.geoadapter.begin.ts":"1530978696857","uid":"CGs8rS1rqhyXRRgA64","protocol":"dns","original_string":"DNS
| AA:false qclass_name:qclass-32769 id.orig_p:5353 qtype_name:PTR qtype:12
rejected:false id.resp_p:5353 query:_googlecast._tcp.local trans_id:0 TC:false
RA:false uid:CGs8rS1rqhyXRRgA64 RD:false proto:udp id.orig_h:192.168.66.1 Z:0
qclass:32769 ts:1530978687.836793
id.resp_h:224.0.0.251","ip_dst_addr":"224.0.0.251","threatinteljoinbolt.joiner.ts":"1530978697808","enrichmentjoinbolt.joiner.ts":"1530978696932","adapter.hostfromjsonlistadapter.begin.ts":"15309
78696606","threatintelsplitterbolt.splitter.begin.ts":"1530978696949","Z":0,"ip_src_addr":"192.168.66.1","qclass":32769,"timestamp":1530978687836,"AA":false,"query":"_googlecast._tcp.local","TC":false,"RA":false,"source.type":"bro","adapter.geoadapter.end.ts":"1530978696857","RD":false,"threatintelsplitterbolt.splitter.end.ts":"1530978696952","adapter.threatinteladapter.begin.ts":"1530978697764","ip_src_port":5353,"proto":"udp","guid":"90751ce5-703d-4b9f-8c2d-8e5c42e72262"},
key=null, timestamp=1530978697856}
```
## Pull Request Checklist
- [ ] Is there a JIRA ticket associated with this PR? If not one needs to
be created at [Metron
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
- [ ] Does your PR title start with METRON-XXXX where XXXX is the JIRA
number you are trying to resolve? Pay particular attention to the hyphen "-"
character.
- [ ] Has your PR been rebased against the latest commit within the target
branch (typically master)?
- [ ] Have you included steps to reproduce the behavior or problem that is
being changed or addressed?
- [ ] Have you included steps or a guide to how the change may be verified
and tested manually?
- [ ] Have you ensured that the full suite of tests and checks have been
executed in the root metron folder via:
- [ ] Have you written or updated unit tests and or integration tests to
verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies
licensed in a way that is compatible for inclusion under [ASF
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] Have you verified the basic functionality of the build by building
and running locally with Vagrant full-dev environment or the equivalent?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/nickwallen/metron KAFKA_SEEK
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/metron/pull/1097.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1097
----
commit 9361f05fa2644275d89431d46974a2412dc76417
Author: Nick Allen <nick@...>
Date: 2018-07-06T19:37:54Z
Implemented KAFKA_SEEK function
commit b153441cdc2418770cfb2fd2d51e0bba31f1c5d1
Author: Nick Allen <nick@...>
Date: 2018-07-07T17:07:00Z
Improved function docs
----
---