METRON-858 bro-plugin-kafka is throwing segfaults (JonZeolla) closes 
apache/metron#547


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/85872bd6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/85872bd6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/85872bd6

Branch: refs/heads/Metron_0.4.0
Commit: 85872bd68698149692c97a48dfe41a78435dcc99
Parents: 8779eb3
Author: JonZeolla <[email protected]>
Authored: Thu Jun 1 11:28:42 2017 -0400
Committer: jonzeolla <[email protected]>
Committed: Thu Jun 1 11:28:42 2017 -0400

----------------------------------------------------------------------
 metron-sensors/bro-plugin-kafka/README.md       | 56 ++++++++++++++++++--
 .../bro-plugin-kafka/configure.plugin           |  2 +-
 .../bro-plugin-kafka/src/KafkaWriter.cc         | 21 ++++----
 3 files changed, 63 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/README.md 
b/metron-sensors/bro-plugin-kafka/README.md
index 31b1f54..e219360 100644
--- a/metron-sensors/bro-plugin-kafka/README.md
+++ b/metron-sensors/bro-plugin-kafka/README.md
@@ -36,13 +36,14 @@ Installation
 Activation
 ----------
 
-The following examples highlight different ways that the plugin can be used.  
Simply add Bro script to your `local.bro` file (for example, 
`/usr/share/bro/site/local.bro`) as shown to activate the plugin.
+The following examples highlight different ways that the plugin can be used.  
Simply add the Bro script language to your `local.bro` file (for example, 
`/usr/share/bro/site/local.bro`) as shown to demonstrate the example.
 
 ### Example 1
 
 The goal in this example is to send all HTTP and DNS records to a Kafka topic 
named `bro`. 
  * Any configuration value accepted by librdkafka can be added to the 
`kafka_conf` configuration table.  
  * By defining `topic_name` all records will be sent to the same Kafka topic.
+ * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent.
 
 ```
 @load Bro/Kafka/logs-to-kafka.bro
@@ -73,7 +74,6 @@ event bro_init()
         $name = "kafka-http",
         $writer = Log::WRITER_KAFKAWRITER,
         $config = table(
-                ["stream_id"] = "HTTP::LOG",
                 ["metadata.broker.list"] = "localhost:9092"
         ),
         $path = "http"
@@ -85,7 +85,6 @@ event bro_init()
         $name = "kafka-dns",
         $writer = Log::WRITER_KAFKAWRITER,
         $config = table(
-                ["stream_id"] = "DNS::LOG",
                 ["metadata.broker.list"] = "localhost:9092"
         ),
         $path = "dns"
@@ -94,6 +93,57 @@ event bro_init()
 }
 ```
 
+### Example 3
+
+You may want to configure bro to filter log messages with certain 
characteristics from being sent to your kafka topics.  For instance, Metron 
currently doesn't support IPv6 source or destination IPs in the default 
enrichments, so it may be helpful to filter those log messages from being sent 
to kafka (although there are [multiple ways](#notes) to approach this).  In 
this example we will do that that, and are assuming a somewhat standard bro 
kafka plugin configuration, such that:
+ * All bro logs are sent to the `bro` topic, by configuring 
`Kafka::topic_name`.
+ * Each JSON message is tagged with the appropriate log type (such as `http`, 
`dns`, or `conn`), by setting `tag_json` to true.
+ * If the log message contains a 128 byte long source or destination IP 
address, the log is not sent to kafka.
+
+```
+@load Bro/Kafka/logs-to-kafka.bro
+redef Kafka::topic_name = "bro";
+redef Kafka::tag_json = T;
+
+event bro_init() &priority=-5
+{
+    # handles HTTP
+    Log::add_filter(HTTP::LOG, [
+        $name = "kafka-http",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $pred(rec: HTTP::Info) = { return ! (( |rec$id$orig_h| == 128 || 
|rec$id$resp_h| == 128 )); },
+        $config = table(
+            ["metadata.broker.list"] = "localhost:9092"
+        )
+    ]);
+
+    # handles DNS
+    Log::add_filter(DNS::LOG, [
+        $name = "kafka-dns",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $pred(rec: DNS::Info) = { return ! (( |rec$id$orig_h| == 128 || 
|rec$id$resp_h| == 128 )); },
+        $config = table(
+            ["metadata.broker.list"] = "localhost:9092"
+        )
+    ]);
+
+    # handles Conn
+    Log::add_filter(Conn::LOG, [
+        $name = "kafka-conn",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $pred(rec: Conn::Info) = { return ! (( |rec$id$orig_h| == 128 || 
|rec$id$resp_h| == 128 )); },
+        $config = table(
+            ["metadata.broker.list"] = "localhost:9092"
+        )
+    ]);
+}
+```
+
+#### Notes
+ * `logs_to_send` is mutually exclusive with `$pred`, thus for each log you 
want to set `$pred` on, you must individually setup a `Log::add_filter` and 
refrain from including that log in `logs_to_send`.
+ * You can also filter IPv6 logs from within your Metron cluster [using 
Stellar](../../metron-platform/metron-common#IS_IP).  In that case, you 
wouldn't apply a predicate in your bro configuration, and instead Stellar would 
filter the logs out before they were processed by the enrichment layer of 
Metron.
+ * It is also possible to use the `is_v6_subnet()` bro function in your 
predicate, as of their [2.5 
release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), 
however the above example should work on [bro 
2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and 
newer, which has been the focus of the kafka plugin.
+
 Settings
 --------
 

http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/configure.plugin
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/configure.plugin 
b/metron-sensors/bro-plugin-kafka/configure.plugin
index 1cb2086..c7e6662 100644
--- a/metron-sensors/bro-plugin-kafka/configure.plugin
+++ b/metron-sensors/bro-plugin-kafka/configure.plugin
@@ -31,7 +31,7 @@ plugin_option()
 {
   case "$1" in
     --with-librdkafka=*)
-      append_cache_entry LibRdKafka_ROOT_DIR PATH $optarg
+      append_cache_entry LibRDKafka_ROOT_DIR PATH $optarg
       ;;
     --with-openssl=*)
       append_cache_entry OpenSSL_ROOT_DIR PATH $optarg

http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc 
b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
index 951a60c..c9ad44f 100644
--- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
+++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
@@ -75,13 +75,10 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     debug.assign((const char*)BifConst::Kafka::debug->Bytes(), 
BifConst::Kafka::debug->Len());
     bool is_debug(!debug.empty());
     if(is_debug) {
-      reporter->Info( "Debug is turned on and set to: %s.  Available debug 
context: %s."
-                     , debug.c_str()
-                     , RdKafka::get_debug_contexts().c_str()
-                     );
+      MsgThread::Info(Fmt("Debug is turned on and set to: %s.  Available debug 
context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
     }
     else {
-      reporter->Info( "Debug is turned off.");
+      MsgThread::Info(Fmt("Debug is turned off."));
     }
 
     // kafka global configuration
@@ -96,7 +93,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
 
       // apply setting to kafka
       if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-          reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), 
val.c_str(), err.c_str());
+          Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), 
err.c_str()));
           return false;
       }
     }
@@ -105,7 +102,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
         string key("debug");
         string val(debug);
         if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-            reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), 
val.c_str(), err.c_str());
+            Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), 
err.c_str()));
             return false;
         }
     }
@@ -113,7 +110,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     // create kafka producer
     producer = RdKafka::Producer::create(conf, err);
     if (!producer) {
-        reporter->Error("Failed to create producer: %s", err.c_str());
+        Error(Fmt("Failed to create producer: %s", err.c_str()));
         return false;
     }
 
@@ -121,12 +118,12 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
     topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
     if (!topic) {
-        reporter->Error("Failed to create topic handle: %s", err.c_str());
+        Error(Fmt("Failed to create topic handle: %s", err.c_str()));
         return false;
     }
 
     if(is_debug) {
-        reporter->Info("Successfully created producer.");
+        MsgThread::Info(Fmt("Successfully created producer."));
     }
 
     return true;
@@ -154,7 +151,7 @@ bool KafkaWriter::DoFinish(double network_time)
     if (producer->outq_len() == 0) {
         success = true;
     } else {
-        reporter->Error("Unable to deliver %0d message(s)", 
producer->outq_len());
+        Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
     }
 
     delete topic;
@@ -187,7 +184,7 @@ bool KafkaWriter::DoWrite(int num_fields, const 
threading::Field* const* fields,
     }
     else {
         string err = RdKafka::err2str(resp);
-        reporter->Error("Kafka send failed: %s", err.c_str());
+        Error(Fmt("Kafka send failed: %s", err.c_str()));
     }
 
     return true;

Reply via email to