Dear Patrick,
a while ago, Mark Martinec wrote a script that pulls Redis logs out to
standard output, which can be easily fed into splunk.
With a little help of a skilled perl programmer, I am totally sure you
can extend attached script to do whatever you want ;)
cheers, Jernej
On 05/10/14 17:47, Patrick Proniewski wrote:
Hi,
Not that hard... well, it depends on who you are talking about :)
I'm afraid I can barely read Perl. So writing a proper plugin, ready for
production on 3 servers, it kind of a challenge for me. An easier
approach would be to write a shell/python/ruby/whatever script for
Splunk to pull data from Redis, and then define a "script data source"
using this script. But I want to make sure that I've not missed
anything. May be I can leverage some Amavisd-new functionality or
setting to get closer from my goal.
Joolee <[email protected]> wrote:
It wouldn't be that hard to create a plugin for that using the amavis
custom hooks api. I'm planning on writing one myself to feed KairosDB
with statistical information and log some extra information about a
mail to db/file.
On 5 October 2014 13:11, Patrick Proniewski
<[email protected]
<mailto:[email protected]>> wrote:
Hello,
I've given up on ELK (ElasticSearch/Logstash/Kibana), and I'm
moving to Splunk. Amavisd-new ability to log in JSON format is a
very great feature, and I would like to be able to pipe my JSON
logs to Splunk.
The redis output is still defined, from my past tests with ELK and
I have defined this:
$log_templ = <<'EOD';
[:report_json]
EOD
Unfortunately I've got some problem feeding logs into Splunk:
- Splunk won't pull data from a Redis server. It just does not
have proper connector for that.
- Amavisd-new will not log pure JSON into a file, there's always
regular log lines (start/stop for example) and every mail analysis
log entry is prefixed with "time-stamp hostname binary-path[PID]:
(thread-number)", JSON comes only after all those informations.
Hence, Splunk fails to recognize proper JSON, and won't index the
log file.
- Using Syslog with JSON output is not an option, on FreeBSD
syslogd can't handle lines longer than 1000 Bytes.
Any help is greatly appreciated.
I'm registered to digest, feel free to {B}Cc me.
Patrick PRONIEWSKI
--
Responsable pôle Opérations - DSI - Université Lumière Lyon 2
Responsable Sécurité des Systèmes d'Information
Patrick PRONIEWSKI
--
Responsable pôle Opérations - DSI - Université Lumière Lyon 2
Responsable Sécurité des Systèmes d'Information
#!/usr/bin/perl -T
package LogsFeeder;
# Reads events from a Redis queue, writes them to stdout.
use strict;
use re 'taint';
use warnings;
my @log_keys = ( 'logstash-amavis' );
my @storage_redis_dsn = (
{ server => "127.0.0.1:6379", db_id => 1 },
);
binmode(STDOUT,':bytes') or die "Can't set STDOUT to bytes mode: $!";
my $redis_storage = LogsFeeder::Redis->new(@storage_redis_dsn);
$redis_storage or die "Can't create a Redis object";
for (;;) {
my $json = $redis_storage->receive(@log_keys); # UTF-8 encoded
next if !defined $json;
$json .= "\n";
print $json;
}
1;
package LogsFeeder::TinyRedis;
use strict;
use re 'taint';
use warnings;
use utf8;
use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
use IO::Socket::IP;
sub new {
my($class, %args) = @_;
my $self = bless { args => {%args} }, $class;
my $outbuf = ''; $self->{outbuf} = \$outbuf;
$self->{batch_size} = 0;
$self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
$self->{on_connect} = $args{on_connect};
return if !$self->connect;
$self;
}
sub DESTROY {
my $self = $_[0];
local($@, $!, $_);
undef $self->{sock};
}
sub disconnect {
my $self = $_[0];
local($@, $!);
undef $self->{sock};
}
sub connect {
my $self = $_[0];
$self->disconnect;
my $sock;
my $server = $self->{server};
$sock = IO::Socket::IP->new(PeerAddr => $server, Proto => 'tcp');
if ($sock) {
$self->{sock} = $sock;
$self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;
# an on_connect() callback must not use batched calls!
$self->{on_connect}->($self) if $self->{on_connect};
}
$sock;
}
# Receive, parse and return $cnt consecutive redis replies as a list.
#
sub _response {
my($self, $cnt) = @_;
my $sock = $self->{sock};
if (!$sock) {
$self->connect or die "Connect failed: $!";
$sock = $self->{sock};
};
my @list;
for (1 .. $cnt) {
my $result = <$sock>;
if (!defined $result) {
$self->disconnect;
die "Error reading from Redis server: $!";
}
chomp $result;
my $resp_type = substr($result, 0, 1, '');
if ($resp_type eq '$') { # bulk reply
if ($result < 0) {
push(@list, undef); # null bulk reply
} else {
my $data = ''; my $ofs = 0; my $len = $result + 2;
while ($len > 0) {
my $nbytes = read($sock, $data, $len, $ofs);
if (!$nbytes) {
$self->disconnect;
defined $nbytes or die "Error reading from Redis server: $!";
die "Redis server closed connection";
}
$ofs += $nbytes; $len -= $nbytes;
}
chomp $data;
push(@list, $data);
}
} elsif ($resp_type eq ':') { # integer reply
push(@list, 0+$result);
} elsif ($resp_type eq '+') { # status reply
push(@list, $result);
} elsif ($resp_type eq '*') { # multi-bulk reply
push(@list, $result < 0 ? undef : $self->_response(0+$result) );
} elsif ($resp_type eq '-') { # error reply
die "$result\n";
} else {
die "Unknown Redis reply: $resp_type ($result)";
}
}
\@list;
}
sub _write_buff {
my($self, $bufref) = @_;
if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
my $nwrite;
for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
# to reliably detect a disconnect we need to check for an input event
# using a select; checking status of syswrite is not sufficient
my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
defined $nfound && $nfound >= 0 or die "Select failed: $!";
if (vec($rout, $self->{sock_fd}, 1) &&
!sysread($self->{sock}, $inbuff, 1024)) {
# eof, try reconnecting
$self->connect or die "Connect failed: $!";
}
local $SIG{PIPE} = 'IGNORE'; # don't signal on a write to a widowed pipe
$nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
next if defined $nwrite;
$nwrite = 0;
if ($! == EINTR || $! == EAGAIN) { # no big deal, try again
Time::HiRes::sleep(0.1); # slow down, just in case
} else {
$self->disconnect;
if ($! == ENOTCONN || $! == EPIPE ||
$! == ECONNRESET || $! == ECONNABORTED) {
$self->connect or die "Connect failed: $!";
} else {
die "Error writing to redis socket: $!";
}
}
}
1;
}
# Send a redis command with arguments, returning a redis reply.
#
sub call {
my $self = shift;
my $buff = '*' . scalar(@_) . "\015\012";
$buff .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
$self->_write_buff(\$buff);
local($/) = "\015\012";
my $arr_ref = $self->_response(1);
$arr_ref && $arr_ref->[0];
}
# Append a redis command with arguments to a batch.
#
sub b_call {
my $self = shift;
my $bufref = $self->{outbuf};
$$bufref .= '*' . scalar(@_) . "\015\012";
$$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
++ $self->{batch_size};
}
# Send a batch of commands, returning an arrayref of redis replies,
# each array element corresponding to one command in a batch.
#
sub b_results {
my $self = $_[0];
my $batch_size = $self->{batch_size};
return if !$batch_size;
my $bufref = $self->{outbuf};
$self->_write_buff($bufref);
$$bufref = ''; $self->{batch_size} = 0;
local($/) = "\015\012";
$self->_response($batch_size);
}
1;
package LogsFeeder::Redis;
use strict;
use re 'taint';
use warnings;
sub new {
my($class, @redis_dsn) = @_;
bless { redis_dsn => \@redis_dsn }, $class;
}
sub disconnect {
my $self = $_[0];
$self->{connected} = 0; undef $self->{redis};
}
sub on_connect {
my($self, $r) = @_;
my $db_id = $self->{db_id} || 0;
eval {
$r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
} or do {
if ($@ =~ /\bNOAUTH\b/) {
defined $self->{password}
or die "Redis server requires authentication, no password provided";
$r->call('AUTH', $self->{password});
$r->call('SELECT', $db_id);
} else {
chomp $@; die "Redis error: $@";
}
};
$r->call('CLIENT', 'SETNAME', 'amavis['.$$.']');
1;
}
sub connect {
my $self = $_[0];
$self->disconnect if $self->{connected};
$self->{redis} = $self->{db_id} = $self->{ttl} = undef;
my($r, $err, $dsn, %options);
my $dsn_list_ref = $self->{redis_dsn};
for my $j (1 .. @$dsn_list_ref) {
$dsn = $dsn_list_ref->[0];
%options = ref $dsn eq 'HASH' ? %$dsn : ();
# expiration time (time-to-live) is 16 days by default
$self->{ttl} = exists $options{ttl} ? $options{ttl} : undef;
$self->{db_id} = $options{db_id};
if (defined $options{password}) {
$self->{password} = $options{password};
$options{password} = '(hidden)'; # for logging purposes
}
undef $err;
eval {
my %opt = %options; delete @opt{qw(ttl db_id password)};
$r = LogsFeeder::TinyRedis->new(on_connect => sub { $self->on_connect(@_) }, %opt);
$r or die "Error: $!";
} or do {
undef $r; $err = $@; chomp $err;
};
$self->{redis} = $r;
last if $r; # success, done
if ($j < @$dsn_list_ref) { # not all tried yet
push(@$dsn_list_ref, shift @$dsn_list_ref); # rotate left
}
}
if (!$r) {
$self->{redis} = $self->{db_id} = $self->{ttl} = undef;
die sprintf("Can't connect to a redis server %s: %s\n",
join(' ',%options), $err);
}
$self->{connected} = 1;
$r;
}
sub DESTROY {
my $self = $_[0]; local($@,$!,$_);
eval { $self->{connected} = 0; undef $self->{redis} };
}
sub send {
my($self, $str_ref, $log_key, $max_queue_len) = @_;
$max_queue_len ||= 500000;
$self->connect if !$self->{connected};
my $r = $self->{redis};
$r->b_call("RPUSH", $log_key, $$str_ref);
# keep most recent
$r->b_call("LTRIM", $log_key, -$max_queue_len, -1) if $max_queue_len;
my $res = $r->b_results; # errors will be signalled
return $res->[0]; # current list length
}
sub receive {
my($self, @log_keys) = @_;
$self->connect if !$self->{connected};
my $r = $self->{redis};
my $res;
eval {
$res = $r->call("BLPOP", @log_keys, 0); 1;
} or do {
chomp $@;
do_log(1, "redis receive failed: %s", $@);
undef $res;
};
return if !$res;
return $res->[1]; # popped element
}
1;