[
https://issues.apache.org/jira/browse/METRON-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462387#comment-16462387
]
ASF GitHub Bot commented on METRON-1533:
----------------------------------------
Github user nickwallen commented on the issue:
https://github.com/apache/metron/pull/1000
Thanks for taking it for a test drive. I think all your observations are
explainable, but they all point out usability issues that I think I can improve
on.
#### 1. Offsets
`KAFKA_FIND` 'sticks' on its consumer offset. It operates more like
`KAFKA_GET` than `KAFKA_TAIL`. This is how I described it in the docs.
> Finds messages that satisfy a given filter expression. Subsequent calls
will continue retrieving messages sequentially from the original offset.
When you first run `KAFKA_FIND`, its consumer offset will not be set. It
will pick-up from the end of the topic. When you run it again in the same
session, it will continue filtering from those same offsets, rather than going
to the end of the topic.
The `kafka-console-consumer` tool always seeks to the end when it is run.
In your test its likely that `kafka-console-consumer` and `KAFKA_FIND` are at
completely different offsets as you try to compare the two.
I had actually already been working on a version of this that always seeks
to the end and so behaves more like `KAFKA_TAIL` and `kafka-console-consumer`.
Per the use case I described in the PR, I think 'seek to end' makes more
sense. You make a change on a live stream and want to see the immediate
results. If `KAFKA_TAIL` 'sticks' on an earlier offset, you're not going to
see the most recent messages, which can be confusing for the user.
#### 2. Timeouts
> How long will this command listen until it times out (or is it based on
number of messages read)? ... Is this configurable?
The command will poll for up to 5 seconds, by default. This can be
adjusted by defining a global property `stellar.kafka.max.wait`.
> Sometimes it returned an empty array immediately.
In this case, it probably pulled in messages from the topic, none of those
messages matched your filter, and so returned an empty array to you.
I probably need to look at the timeout logic under these conditions. It
should probably 'try harder' to find matching messages and not return
immediately. I'll take a look at this and see if it can be improved.
> Create KAFKA_FIND Stellar Function
> ----------------------------------
>
> Key: METRON-1533
> URL: https://issues.apache.org/jira/browse/METRON-1533
> Project: Metron
> Issue Type: Improvement
> Reporter: Nick Allen
> Assignee: Nick Allen
> Priority: Minor
>
> When creating enrichments, I often find that I want to validate that the
> enrichment I just created was successful on the live, incoming stream of
> telemetry. My workflow looks something like this.
> 1. Create and test the enrichment that I want to create.
> {code:java}
> [Stellar]>>> ip_src_addr := "72.34.49.86"
> 72.34.49.86
> [Stellar]>>> geo := GEO_GET(ip_src_addr)
> {country=US, dmaCode=803, city=Los Angeles, postalCode=90014,
> latitude=34.0438, location_point=34.0438,-118.2512, locID=5368361,
> longitude=-118.2512}
> {code}
> 2. That looks good to me. Now let's add that to my Bro telemetry.
> {code:java}
> [Stellar]>>> conf := SHELL_EDIT(conf)
> {
> "enrichment" : {
> "fieldMap": {
> "stellar": {
> "config": [
> "geo := GEO_GET(ip_src_addr)"
> ]
> }
> }
> },
> "threatIntel": {
> }
> }
> [Stellar]>>> CONFIG_PUT("ENRICHMENTS", e, "bro")
> {code}
>
> 3. It looks like that worked, but did that really work?
> At this point, I would run KAFKA_GET as many times as it takes to retrieve a
> Bro message. You would just have to get lucky and hope that the enrichment
> worked and secondly that you would pull down a Bro message (as opposed to a
> different sensor).
>
> I would rather have a function that lets me only pull back the messages that
> I care about. In this case I could either retrieve only Bro messages.
> {code:java}
> KAFKA_FIND('indexing', m -> MAP_GET('source.type', m) == 'bro')
> {code}
> Or I could look for messages that contain geolocation data.
> {code:java}
> KAFKA_FIND('indexing', m -> MAP_EXISTS('geo.city', m))
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)