Repository: metron-bro-plugin-kafka Updated Branches: refs/heads/master a2680de29 -> 1dfc5239f
METRON-1469 Kafka Plugin for Bro - Configurable JSON Timestamps (dcode via nickwallen) closes apache/metron-bro-plugin-kafka#6 Project: http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/commit/1dfc5239 Tree: http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/tree/1dfc5239 Diff: http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/diff/1dfc5239 Branch: refs/heads/master Commit: 1dfc5239fae31a64026188109d1e346ce93d5c02 Parents: a2680de Author: dcode <[email protected]> Authored: Sat Mar 17 09:27:16 2018 -0400 Committer: nickallen <[email protected]> Committed: Sat Mar 17 09:27:16 2018 -0400 ---------------------------------------------------------------------- README.md | 14 ++++++++++--- scripts/init.bro | 1 + src/KafkaWriter.cc | 55 ++++++++++++++++++++++++++++++++++++++++++++----- src/KafkaWriter.h | 3 ++- src/kafka.bif | 1 + 5 files changed, 65 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/blob/1dfc5239/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 0c67347..8aff3ba 100644 --- a/README.md +++ b/README.md @@ -37,11 +37,10 @@ The following examples highlight different ways that the plugin can be used. Si ### Example 1 -The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. +The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. * By defining `topic_name` all records will be sent to the same Kafka topic. - * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent. - + * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent. ``` @load packages/metron-bro-plugin-kafka/Apache/Kafka redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); @@ -183,6 +182,15 @@ example, a Conn::LOG message will look like `{ 'conn' : { ... }}`. redef Kafka::tag_json = T; ``` +### `json_timestamps` + +Uses Ascii log writer for timestamp format. Default is `JSON::TS_EPOCH`. Other +options are `JSON::TS_MILLIS` and `JSON::TS_ISO8601`. + +``` +redef Kafka::json_timestamps = JSON::TS_ISO8601; +``` + ### `debug` A comma separated list of debug contexts in librdkafka which you want to http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/blob/1dfc5239/scripts/init.bro ---------------------------------------------------------------------- diff --git a/scripts/init.bro b/scripts/init.bro index 65fb9e7..ad9f0a1 100644 --- a/scripts/init.bro +++ b/scripts/init.bro @@ -22,6 +22,7 @@ export { const topic_name: string = "bro" &redef; const max_wait_on_shutdown: count = 3000 &redef; const tag_json: bool = F &redef; + const json_timestamps: JSON::TimestampFormat = JSON::TS_EPOCH &redef; const kafka_conf: table[string] of string = table( ["metadata.broker.list"] = "localhost:9092" ) &redef; http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/blob/1dfc5239/src/KafkaWriter.cc ---------------------------------------------------------------------- diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc index c9ad44f..79b5aa0 100644 --- a/src/KafkaWriter.cc +++ b/src/KafkaWriter.cc @@ -20,7 +20,11 @@ using namespace logging; using namespace writer; -KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL) +KafkaWriter::KafkaWriter(WriterFrontend* frontend): + WriterBackend(frontend), + formatter(NULL), + producer(NULL), + topic(NULL) { // need thread-local copies of all user-defined settings coming from // bro scripting land. accessing these is not thread-safe and 'DoInit' @@ -29,6 +33,14 @@ KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), for // tag_json - thread local copy tag_json = BifConst::Kafka::tag_json; + // json_timestamps + ODesc tsfmt; + BifConst::Kafka::json_timestamps->Describe(&tsfmt); + json_timestamps.assign( + (const char*) tsfmt.Bytes(), + tsfmt.Len() + ); + // topic name - thread local copy topic_name.assign( (const char*)BifConst::Kafka::topic_name->Bytes(), @@ -54,20 +66,53 @@ KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), for } KafkaWriter::~KafkaWriter() -{} +{ + + // Cleanup all the things + delete topic; + delete producer; + delete formatter; + delete conf; + delete topic_conf; + +} bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields) { + // Timeformat object, default to TS_EPOCH + threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH; + // if no global 'topic_name' is defined, use the log stream's 'path' if(topic_name.empty()) { topic_name = info.path; } + // format timestamps + // NOTE: This string comparision implementation is currently the necessary + // way to do it, as there isn't a way to pass the Bro enum into C++ enum. + // This makes the user interface consistent with the existing Bro Logging + // configuration for the ASCII log output. + if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 ) { + tf = threading::formatter::JSON::TS_EPOCH; + } + else if ( strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0 ) { + tf = threading::formatter::JSON::TS_MILLIS; + } + else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) { + tf = threading::formatter::JSON::TS_ISO8601; + } + else { + Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s", + json_timestamps.c_str())); + return false; + } + // initialize the formatter if(BifConst::Kafka::tag_json) { - formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH); - } else { - formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH); + formatter = new threading::formatter::TaggedJSON(info.path, this, tf); + } + else { + formatter = new threading::formatter::JSON(this, tf); } // is debug enabled http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/blob/1dfc5239/src/KafkaWriter.h ---------------------------------------------------------------------- diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h index ad3e03f..14e0f7e 100644 --- a/src/KafkaWriter.h +++ b/src/KafkaWriter.h @@ -47,7 +47,7 @@ namespace logging { namespace writer { class KafkaWriter : public WriterBackend { public: - KafkaWriter(WriterFrontend* frontend); + explicit KafkaWriter(WriterFrontend* frontend); ~KafkaWriter(); static WriterBackend* Instantiate(WriterFrontend* frontend) @@ -68,6 +68,7 @@ private: static const string default_topic_key; string stream_id; bool tag_json; + string json_timestamps; map<string, string> kafka_conf; string topic_name; threading::formatter::Formatter *formatter; http://git-wip-us.apache.org/repos/asf/metron-bro-plugin-kafka/blob/1dfc5239/src/kafka.bif ---------------------------------------------------------------------- diff --git a/src/kafka.bif b/src/kafka.bif index 2f5a2b5..2709072 100644 --- a/src/kafka.bif +++ b/src/kafka.bif @@ -21,4 +21,5 @@ const kafka_conf: config; const topic_name: string; const max_wait_on_shutdown: count; const tag_json: bool; +const json_timestamps: JSON::TimestampFormat; const debug: string;
