Repository: incubator-metron Updated Branches: refs/heads/master 09f13b2f9 -> a2452a25c
METRON-812: Make the bro-kafka plugin work with kerberos this closes apache/incubator-metron#501 Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/a2452a25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/a2452a25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/a2452a25 Branch: refs/heads/master Commit: a2452a25caffdd8c35fd9efe0ed49ce0dd2e3781 Parents: 09f13b2 Author: cstella <ceste...@gmail.com> Authored: Mon Apr 3 09:52:25 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Mon Apr 3 09:52:25 2017 -0400 ---------------------------------------------------------------------- .../roles/librdkafka/defaults/main.yml | 4 +- .../roles/librdkafka/tasks/dependencies.yml | 3 + .../roles/librdkafka/tasks/librdkafka.yml | 2 +- metron-sensors/bro-plugin-kafka/README | 92 ----------- metron-sensors/bro-plugin-kafka/README.md | 160 +++++++++++++++++++ .../bro-plugin-kafka/scripts/init.bro | 1 + .../bro-plugin-kafka/src/KafkaWriter.cc | 33 +++- .../bro-plugin-kafka/src/KafkaWriter.h | 7 +- metron-sensors/bro-plugin-kafka/src/Plugin.cc | 1 - metron-sensors/bro-plugin-kafka/src/Plugin.h | 1 + metron-sensors/bro-plugin-kafka/src/kafka.bif | 1 + 11 files changed, 200 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-deployment/roles/librdkafka/defaults/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/librdkafka/defaults/main.yml b/metron-deployment/roles/librdkafka/defaults/main.yml index d920883..063c22f 100644 --- a/metron-deployment/roles/librdkafka/defaults/main.yml +++ b/metron-deployment/roles/librdkafka/defaults/main.yml @@ -15,6 +15,6 @@ # limitations under the License. # --- -librdkafka_version: 0.8.6 -librdkafka_url: https://github.com/edenhill/librdkafka/archive/0.8.6.tar.gz +librdkafka_version: 0.9.4 +librdkafka_url: https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz librdkafka_home: /usr/local http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-deployment/roles/librdkafka/tasks/dependencies.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/librdkafka/tasks/dependencies.yml b/metron-deployment/roles/librdkafka/tasks/dependencies.yml index 431e861..72ff907 100644 --- a/metron-deployment/roles/librdkafka/tasks/dependencies.yml +++ b/metron-deployment/roles/librdkafka/tasks/dependencies.yml @@ -31,6 +31,9 @@ - swig - zlib-devel - perl + - cyrus-sasl + - cyrus-sasl-devel + - cyrus-sasl-gssapi register: result until: result.rc == 0 retries: 5 http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-deployment/roles/librdkafka/tasks/librdkafka.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/librdkafka/tasks/librdkafka.yml b/metron-deployment/roles/librdkafka/tasks/librdkafka.yml index 652d319..a7971d6 100644 --- a/metron-deployment/roles/librdkafka/tasks/librdkafka.yml +++ b/metron-deployment/roles/librdkafka/tasks/librdkafka.yml @@ -34,6 +34,6 @@ creates: "{{ librdkafka_home }}/lib/librdkafka.so" with_items: - rm -rf build/ - - "./configure --prefix={{ librdkafka_home }}" + - "./configure --prefix={{ librdkafka_home }} --enable-sasl" - make - make install http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/README ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/README b/metron-sensors/bro-plugin-kafka/README deleted file mode 100644 index e6caa7a..0000000 --- a/metron-sensors/bro-plugin-kafka/README +++ /dev/null @@ -1,92 +0,0 @@ -Bro Logging Output to Kafka -=========================== - -A Bro log writer that sends logging output to Kafka. This provides a convenient -means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to -process the data generated by Bro. - -Installation ------------- - -Install librdkafka (https://github.com/edenhill/librdkafka), a native client -library for Kafka. This plugin has been tested against the latest release of -librdkafka, which at the time of this writing is v0.8.6. - - # curl -L https://github.com/edenhill/librdkafka/archive/0.8.6.tar.gz | tar xvz - # cd librdkafka-0.8.6/ - # ./configure - # make - # sudo make install - -Then compile this Bro plugin using the following commands. - - # ./configure --bro-dist=$BRO_SRC - # make - # sudo make install - -Run the following command to ensure that the plugin was installed successfully. - - # bro -N Bro::Kafka - Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) - -Activation ----------- - -The easiest way to enable Kafka output is to load the plugin's -``logs-to-kafka.bro`` script. If you are using BroControl, the following lines -added to local.bro will activate it. - -``` -@load Bro/Kafka/logs-to-kafka.bro -redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG); -redef Kafka::topic_name = "bro"; -redef Kafka::kafka_conf = table( - ["metadata.broker.list"] = "localhost:9092" -); -``` - -This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on -the localhost to a topic called ``bro``. Any configuration value accepted by -librdkafka can be added to the ``kafka_conf`` configuration table. - -Settings --------- - -### ``kafka_conf`` - -The global configuration settings for Kafka. These values are passed through -directly to librdkafka. Any valid librdkafka settings can be defined in this -table. - -``` -redef Kafka::kafka_conf = table( - ["metadata.broker.list"] = "localhost:9092", - ["client.id"] = "bro" -); -``` - -### ``topic_name`` - -The name of the topic in Kafka where all Bro logs will be sent to. - -``` -redef Kafka::topic_name = "bro"; -``` - -### ``max_wait_on_shutdown`` - -The maximum number of milliseconds that the plugin will wait for any backlog of -queued messages to be sent to Kafka before forced shutdown. - -``` -redef Kafka::max_wait_on_shutdown = 3000; -``` - -### ``tag_json`` - -If true, a log stream identifier is appended to each JSON-formatted message. For -example, a Conn::LOG message will look like ``{ 'conn' : { ... }}``. - -``` -redef Kafka::tag_json = T; -``` http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/README.md ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md new file mode 100644 index 0000000..e9646e9 --- /dev/null +++ b/metron-sensors/bro-plugin-kafka/README.md @@ -0,0 +1,160 @@ +Bro Logging Output to Kafka +=========================== + +A Bro log writer that sends logging output to Kafka. This provides a convenient +means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to +process the data generated by Bro. + +Installation +------------ + +Install librdkafka (https://github.com/edenhill/librdkafka), a native client +library for Kafka. This plugin has been tested against the latest release of +librdkafka, which at the time of this writing is v0.9.4. In order to support interacting +with a kerberized kafka, you will need libsasl2 installed + + # curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz + # cd librdkafka-0.9.4/ + # ./configure --enable-sasl + # make + # sudo make install + +Then compile this Bro plugin using the following commands. + + # ./configure --bro-dist=$BRO_SRC + # make + # sudo make install + +Run the following command to ensure that the plugin was installed successfully. + + # bro -N Bro::Kafka + Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) + +Activation +---------- + +The easiest way to enable Kafka output is to load the plugin's +``logs-to-kafka.bro`` script. If you are using BroControl, the following lines +added to local.bro will activate it. + +``` +@load Bro/Kafka/logs-to-kafka.bro +redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG); +redef Kafka::topic_name = "bro"; +redef Kafka::kafka_conf = table( + ["metadata.broker.list"] = "localhost:9092" +); +``` + +This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on +the localhost to a topic called ``bro``. Any configuration value accepted by +librdkafka can be added to the ``kafka_conf`` configuration table. + +Settings +-------- + +### ``kafka_conf`` + +The global configuration settings for Kafka. These values are passed through +directly to librdkafka. Any valid librdkafka settings can be defined in this +table. The full set of valid librdkafka settings are available +[here](https://github.com/edenhill/librdkafka/blob/v0.9.4/CONFIGURATION.md). + +``` +redef Kafka::kafka_conf = table( + ["metadata.broker.list"] = "localhost:9092", + ["client.id"] = "bro" +); +``` + +### ``topic_name`` + +The name of the topic in Kafka where all Bro logs will be sent to. + +``` +redef Kafka::topic_name = "bro"; +``` + +### ``max_wait_on_shutdown`` + +The maximum number of milliseconds that the plugin will wait for any backlog of +queued messages to be sent to Kafka before forced shutdown. + +``` +redef Kafka::max_wait_on_shutdown = 3000; +``` + +### ``tag_json`` + +If true, a log stream identifier is appended to each JSON-formatted message. For +example, a Conn::LOG message will look like ``{ 'conn' : { ... }}``. + +``` +redef Kafka::tag_json = T; +``` + +### ``debug`` + +A comma separated list of debug contexts in librdkafka which you want to +enable. The available contexts are: +* generic +* broker +* topic +* metadata +* queue +* msg +* protocol +* cgrp +* security +* fetch +* feature +* all + +Kerberos +-------- + +This plugin supports producing messages from a kerberized kafka. There +are a couple of prerequisites and a couple of settings to set. + +### SASL +If you are using SASL as a security protocol for kafka, then you must have +libsasl or libsasl2 installed. You can tell if sasl is enabled by +running the following from the directory in which you have build +librdkafka: +``` +examples/rdkafka_example -X builtin.features +builtin.features = gzip,snappy,ssl,sasl,regex +``` + +### Producer Config + +As stated above, you can configure the producer kafka configs in +`${BRO_HOME}/share/bro/site/local.bro`. There are a few configs +necessary to set, which are described +[here](https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka). +For an environment where the following is true: +* The broker is `node1:6667` +* This kafka is using `SASL_PLAINTEXT` as the security protocol +* The keytab used is the `metron` keytab +* The service principal for `metron` is `met...@example.com` + +The kafka topic `bro` has been given permission for the `metron` user to +write: +``` +# login using the metron user +kinit -kt /etc/security/keytabs/metron.headless.keytab met...@example.com +${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro +``` + +The following is how the `${BRO_HOME}/share/bro/site/local.bro` looks: +``` +@load Bro/Kafka/logs-to-kafka.bro +redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); +redef Kafka::topic_name = "bro"; +redef Kafka::tag_json = T; +redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "node1:6667" + , ["security.protocol"] = "SASL_PLAINTEXT" + , ["sasl.kerberos.keytab"] = "/etc/security/keytabs/metron.headless.keytab" + , ["sasl.kerberos.principal"] = "met...@example.com" + ); +``` http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/scripts/init.bro ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/scripts/init.bro b/metron-sensors/bro-plugin-kafka/scripts/init.bro index c76b2a6..1f06c2d 100644 --- a/metron-sensors/bro-plugin-kafka/scripts/init.bro +++ b/metron-sensors/bro-plugin-kafka/scripts/init.bro @@ -24,4 +24,5 @@ export { const kafka_conf: table[string] of string = table( ["metadata.broker.list"] = "localhost:9092" ) &redef; + const debug: string = "" &redef; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc index 9019790..79a85ed 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc @@ -15,11 +15,6 @@ * limitations under the License. */ -#include <Type.h> -#include <threading/Formatter.h> -#include <threading/formatters/JSON.h> -#include "kafka.bif.h" -#include "TaggedJSON.h" #include "KafkaWriter.h" using namespace logging; @@ -46,6 +41,18 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading // kafka global configuration string err; + string debug; + debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len()); + bool is_debug(!debug.empty()); + if(is_debug) { + reporter->Info( "Debug is turned on and set to: %s. Available debug context: %s." + , debug.c_str() + , RdKafka::get_debug_contexts().c_str() + ); + } + else { + reporter->Info( "Debug is turned off."); + } conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // apply the user-defined settings to kafka @@ -60,6 +67,9 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading string key = index->Index(0)->AsString()->CheckString(); string val = v->Value()->AsString()->CheckString(); + if(is_debug) { + reporter->Info("Setting '%s'='%s'", key.c_str(), val.c_str()); + } // apply setting to kafka if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); @@ -71,6 +81,15 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading delete k; } + if(is_debug) { + string key("debug"); + string val(debug); + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); + return false; + } + } + // create kafka producer producer = RdKafka::Producer::create(conf, err); if (!producer) { @@ -85,7 +104,9 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading reporter->Error("Failed to create topic handle: %s", err.c_str()); return false; } - + if(is_debug) { + reporter->Info("Successfully created producer."); + } return true; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h index 2299667..7e77bc0 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h @@ -18,13 +18,14 @@ #ifndef BRO_PLUGIN_BRO_KAFKA_KAFKAWRITER_H #define BRO_PLUGIN_BRO_KAFKA_KAFKAWRITER_H -#include <string> #include <librdkafka/rdkafkacpp.h> +#include <string> +#include <Type.h> #include <logging/WriterBackend.h> #include <threading/formatters/JSON.h> -#include <Type.h> -#include "kafka.bif.h" +#include <threading/Formatter.h> +#include "kafka.bif.h" #include "TaggedJSON.h" namespace logging { namespace writer { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/src/Plugin.cc ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/Plugin.cc b/metron-sensors/bro-plugin-kafka/src/Plugin.cc index d523d23..f3dec8b 100644 --- a/metron-sensors/bro-plugin-kafka/src/Plugin.cc +++ b/metron-sensors/bro-plugin-kafka/src/Plugin.cc @@ -16,7 +16,6 @@ */ #include "Plugin.h" -#include "KafkaWriter.h" namespace plugin { namespace Bro_Kafka { Plugin plugin; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/src/Plugin.h ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/Plugin.h b/metron-sensors/bro-plugin-kafka/src/Plugin.h index 8adeb18..a169076 100644 --- a/metron-sensors/bro-plugin-kafka/src/Plugin.h +++ b/metron-sensors/bro-plugin-kafka/src/Plugin.h @@ -18,6 +18,7 @@ #ifndef BRO_PLUGIN_BRO_KAFKA #define BRO_PLUGIN_BRO_KAFKA +#include "KafkaWriter.h" #include <plugin/Plugin.h> namespace plugin { namespace Bro_Kafka { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a2452a25/metron-sensors/bro-plugin-kafka/src/kafka.bif ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/kafka.bif b/metron-sensors/bro-plugin-kafka/src/kafka.bif index 8a8070c..2f5a2b5 100644 --- a/metron-sensors/bro-plugin-kafka/src/kafka.bif +++ b/metron-sensors/bro-plugin-kafka/src/kafka.bif @@ -21,3 +21,4 @@ const kafka_conf: config; const topic_name: string; const max_wait_on_shutdown: count; const tag_json: bool; +const debug: string;