This is an automated email from the ASF dual-hosted git repository.
jonzeolla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron-bro-plugin-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new d96568e METRON-1992 Support sending a log to multiple topics
(JonZeolla) closes apache/metron-bro-plugin-kafka#23
d96568e is described below
commit d96568e6df21b3f091ffc3ad3fd7f93d7cb16788
Author: JonZeolla <[email protected]>
AuthorDate: Thu Feb 14 09:25:46 2019 -0500
METRON-1992 Support sending a log to multiple topics (JonZeolla) closes
apache/metron-bro-plugin-kafka#23
---
README.md | 41 +++++++++++++++++++++++++++++++++++++++--
src/KafkaWriter.cc | 18 +++++++++++++++---
src/KafkaWriter.h | 2 ++
3 files changed, 56 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 7cc2c46..72436e9 100644
--- a/README.md
+++ b/README.md
@@ -114,7 +114,7 @@ redef Kafka::kafka_conf = table(
### Example 2 - Send all active logs
-This plugin has the ability send all active logs to kafka with the following
configuration.
+This plugin has the ability send all active logs to the "bro" kafka topic with
the following configuration.
```
@load packages/metron-bro-plugin-kafka/Apache/Kafka
@@ -138,7 +138,7 @@ redef Kafka::kafka_conf = table(
);
```
-### Example 4 - Send logs to unique topics
+### Example 4 - Send each bro log to a unique topic
It is also possible to send each log stream to a uniquely named topic. The
goal in this example is to send all HTTP records to a Kafka topic named `http`
and all DNS records to a separate Kafka topic named `dns`.
* The `topic_name` value must be set to an empty string.
@@ -228,6 +228,43 @@ event bro_init() &priority=-10
* You can also filter IPv6 logs from within your Metron cluster [using
Stellar](https://github.com/apache/metron/tree/master/metron-stellar/stellar-common#is_ip).
In that case, you wouldn't apply a predicate in your bro configuration, and
instead Stellar would filter the logs out before they were processed by the
enrichment layer of Metron.
* It is also possible to use the `is_v6_subnet()` bro function in your
predicate, as of their [2.5
release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5),
however the above example should work on [bro
2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and
newer, which has been the focus of the kafka plugin.
+### Example 6 - Sending a log to multiple topics
+
+You are able to send a single bro log to multiple different kafka topics in
the same kafka cluster by overriding the default topic (configured with
`Kafka::topic_name`) by creating a custom bro `Log::Filter`. In this example,
the DHCP, RADIUS, and DNS logs are sent to the "bro" topic; the RADIUS log is
duplicated to the "shew_bro_radius" topic; and the DHCP log is duplicated to
the "shew_bro_dhcp" topic.
+
+```
+@load packages/metron-bro-plugin-kafka/Apache/Kafka
+redef Kafka::logs_to_send = set(DHCP::LOG, RADIUS::LOG, DNS::LOG);
+redef Kafka::topic_name = "bro";
+redef Kafka::kafka_conf = table(
+ ["metadata.broker.list"] =
"server1.example.com:9092,server2.example.com:9092"
+);
+redef Kafka::tag_json = T;
+
+event bro_init() &priority=-10
+{
+ # Send RADIUS to the shew_bro_radius topic
+ local shew_radius_filter: Log::Filter = [
+ $name = "kafka-radius-shew",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $path = "shew_bro_radius"
+ $config = table(["topic_name"] = "shew_bro_radius")
+ ];
+ Log::add_filter(RADIUS::LOG, shew_radius_filter);
+
+ # Send DHCP to the shew_bro_dhcp topic
+ local shew_dhcp_filter: Log::Filter = [
+ $name = "kafka-dhcp-shew",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $path = "shew_bro_dhcp"
+ $config = table(["topic_name"] = "shew_bro_dhcp")
+ ];
+ Log::add_filter(DHCP::LOG, shew_dhcp_filter);
+}
+```
+
+_Note_: Because `Kafka::tag_json` is set to True in this example, the value
of `$path` is used as the tag for each `Log::Filter`. If you were to add a log
filter with the same `$path` as an existing filter, Bro will append "-N", where
N is an integer starting at 2, to the end of the log path so that each filter
has its own unique log path. For instance, the second instance of `conn` would
become `conn-2`.
+
## Settings
### `logs_to_send`
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 1d4a28a..563ef74 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -73,6 +73,15 @@ KafkaWriter::~KafkaWriter()
// Cleanup must happen in DoFinish, not in the destructor
}
+string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name)
const
+{
+ map<const char*, const char*>::const_iterator it =
info.config.find(name.c_str());
+ if (it == info.config.end())
+ return string();
+ else
+ return it->second;
+}
+
/**
* DoInit is called once for each call to the constructor, but in a separate
* thread
@@ -82,9 +91,12 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int
num_fields, const threading
// Timeformat object, default to TS_EPOCH
threading::formatter::JSON::TimeFormat tf =
threading::formatter::JSON::TS_EPOCH;
- // if no global 'topic_name' is defined, use the log stream's 'path'
- if(topic_name.empty()) {
- topic_name = info.path;
+ // Allow overriding of the kafka topic via the Bro script constant
"topic_name"
+ // which can be applied when adding a new Bro log filter.
+ topic_name_override = GetConfigValue(info, "topic_name");
+
+ if(!topic_name_override.empty()) {
+ topic_name = topic_name_override;
}
/**
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index 14e0f7e..c67c664 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -65,12 +65,14 @@ protected:
virtual bool DoHeartbeat(double network_time, double current_time);
private:
+ string GetConfigValue(const WriterInfo& info, const string name) const;
static const string default_topic_key;
string stream_id;
bool tag_json;
string json_timestamps;
map<string, string> kafka_conf;
string topic_name;
+ string topic_name_override;
threading::formatter::Formatter *formatter;
RdKafka::Producer* producer;
RdKafka::Topic* topic;