Dear Patrick,

a while ago, Mark Martinec wrote a script that pulls Redis logs out to standard output, which can be easily fed into splunk.

With a little help of a skilled perl programmer, I am totally sure you can extend attached script to do whatever you want ;)

cheers, Jernej

On 05/10/14 17:47, Patrick Proniewski wrote:
Hi,

Not that hard... well, it depends on who you are talking about :)
I'm afraid I can barely read Perl. So writing a proper plugin, ready for
production on 3 servers, it kind of a challenge for me. An easier
approach would be to write a shell/python/ruby/whatever script for
Splunk to pull data from Redis, and then define a "script data source"
using this script. But I want to make sure that I've not missed
anything. May be I can leverage some Amavisd-new functionality or
setting to get closer from my goal.


Joolee <[email protected]> wrote:
It wouldn't be that hard to create a plugin for that using the amavis
custom hooks api. I'm planning on writing one myself to feed KairosDB
with statistical information and log some extra information about a
mail to db/file.

On 5 October 2014 13:11, Patrick Proniewski
<[email protected]
<mailto:[email protected]>> wrote:

    Hello,

    I've given up on ELK (ElasticSearch/Logstash/Kibana), and I'm
    moving to Splunk. Amavisd-new ability to log in JSON format is a
    very great feature, and I would like to be able to pipe my JSON
    logs to Splunk.

    The redis output is still defined, from my past tests with ELK and
    I have defined this:

    $log_templ = <<'EOD';
    [:report_json]
    EOD

    Unfortunately I've got some problem feeding logs into Splunk:

    - Splunk won't pull data from a Redis server. It just does not
    have proper connector for that.
    - Amavisd-new will not log pure JSON into a file, there's always
    regular log lines (start/stop for example) and every mail analysis
    log entry is prefixed with "time-stamp hostname binary-path[PID]:
    (thread-number)", JSON comes only after all those informations.
    Hence, Splunk fails to recognize proper JSON, and won't index the
    log file.
    - Using Syslog with JSON output is not an option, on FreeBSD
    syslogd can't handle lines longer than 1000 Bytes.

    Any help is greatly appreciated.

    I'm registered to digest, feel free to {B}Cc me.

    Patrick PRONIEWSKI
    --
    Responsable pôle Opérations - DSI - Université Lumière Lyon 2
    Responsable Sécurité des Systèmes d'Information




Patrick PRONIEWSKI
--
Responsable pôle Opérations - DSI - Université Lumière Lyon 2
Responsable Sécurité des Systèmes d'Information


#!/usr/bin/perl -T

package LogsFeeder;

# Reads events from a Redis queue, writes them to stdout.

use strict;
use re 'taint';
use warnings;

my @log_keys = ( 'logstash-amavis' );

my @storage_redis_dsn = (
  { server => "127.0.0.1:6379", db_id => 1 },
);

binmode(STDOUT,':bytes') or die "Can't set STDOUT to bytes mode: $!";
my $redis_storage = LogsFeeder::Redis->new(@storage_redis_dsn);
$redis_storage or die "Can't create a Redis object";
for (;;) {
  my $json = $redis_storage->receive(@log_keys);  # UTF-8 encoded
  next if !defined $json;
  $json .= "\n";
  print $json;
}

1;


package LogsFeeder::TinyRedis;

use strict;
use re 'taint';
use warnings;
use utf8;

use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
use IO::Socket::IP;

sub new {
  my($class, %args) = @_;
  my $self = bless { args => {%args} }, $class;
  my $outbuf = ''; $self->{outbuf} = \$outbuf;
  $self->{batch_size} = 0;
  $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
  $self->{on_connect} = $args{on_connect};
  return if !$self->connect;
  $self;
}

sub DESTROY {
  my $self = $_[0];
  local($@, $!, $_);
  undef $self->{sock};
}

sub disconnect {
  my $self = $_[0];
  local($@, $!);
  undef $self->{sock};
}

sub connect {
  my $self = $_[0];

  $self->disconnect;
  my $sock;
  my $server = $self->{server};
  $sock = IO::Socket::IP->new(PeerAddr => $server, Proto => 'tcp');
  if ($sock) {
    $self->{sock} = $sock;

    $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
    vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;

    # an on_connect() callback must not use batched calls!
    $self->{on_connect}->($self)  if $self->{on_connect};
  }
  $sock;
}

# Receive, parse and return $cnt consecutive redis replies as a list.
#
sub _response {
  my($self, $cnt) = @_;

  my $sock = $self->{sock};
  if (!$sock) {
    $self->connect  or die "Connect failed: $!";
    $sock = $self->{sock};
  };

  my @list;

  for (1 .. $cnt) {

    my $result = <$sock>;
    if (!defined $result) {
      $self->disconnect;
      die "Error reading from Redis server: $!";
    }
    chomp $result;
    my $resp_type = substr($result, 0, 1, '');

    if ($resp_type eq '$') {  # bulk reply
      if ($result < 0) {
        push(@list, undef);  # null bulk reply
      } else {
        my $data = ''; my $ofs = 0; my $len = $result + 2;
        while ($len > 0) {
          my $nbytes = read($sock, $data, $len, $ofs);
          if (!$nbytes) {
            $self->disconnect;
            defined $nbytes  or die "Error reading from Redis server: $!";
            die "Redis server closed connection";
          }
          $ofs += $nbytes; $len -= $nbytes;
        }
        chomp $data;
        push(@list, $data);
      }

    } elsif ($resp_type eq ':') {  # integer reply
      push(@list, 0+$result);

    } elsif ($resp_type eq '+') {  # status reply
      push(@list, $result);

    } elsif ($resp_type eq '*') {  # multi-bulk reply
      push(@list, $result < 0 ? undef : $self->_response(0+$result) );

    } elsif ($resp_type eq '-') {  # error reply
      die "$result\n";

    } else {
      die "Unknown Redis reply: $resp_type ($result)";
    }
  }
  \@list;
}

sub _write_buff {
  my($self, $bufref) = @_;

  if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
  my $nwrite;
  for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
    # to reliably detect a disconnect we need to check for an input event
    # using a select; checking status of syswrite is not sufficient
    my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
    my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
    defined $nfound && $nfound >= 0 or die "Select failed: $!";
    if (vec($rout, $self->{sock_fd}, 1) &&
        !sysread($self->{sock}, $inbuff, 1024)) {
      # eof, try reconnecting
      $self->connect  or die "Connect failed: $!";
    }
    local $SIG{PIPE} = 'IGNORE';  # don't signal on a write to a widowed pipe
    $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
    next if defined $nwrite;
    $nwrite = 0;
    if ($! == EINTR || $! == EAGAIN) {  # no big deal, try again
      Time::HiRes::sleep(0.1);  # slow down, just in case
    } else {
      $self->disconnect;
      if ($! == ENOTCONN   || $! == EPIPE ||
          $! == ECONNRESET || $! == ECONNABORTED) {
        $self->connect  or die "Connect failed: $!";
      } else {
        die "Error writing to redis socket: $!";
      }
    }
  }
  1;
}

# Send a redis command with arguments, returning a redis reply.
#
sub call {
  my $self = shift;

  my $buff = '*' . scalar(@_) . "\015\012";
  $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;

  $self->_write_buff(\$buff);
  local($/) = "\015\012";
  my $arr_ref = $self->_response(1);
  $arr_ref && $arr_ref->[0];
}

# Append a redis command with arguments to a batch.
#
sub b_call {
  my $self = shift;

  my $bufref = $self->{outbuf};
  $$bufref .= '*' . scalar(@_) . "\015\012";
  $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
  ++ $self->{batch_size};
}

# Send a batch of commands, returning an arrayref of redis replies,
# each array element corresponding to one command in a batch.
#
sub b_results {
  my $self = $_[0];
  my $batch_size = $self->{batch_size};
  return if !$batch_size;
  my $bufref = $self->{outbuf};
  $self->_write_buff($bufref);
  $$bufref = ''; $self->{batch_size} = 0;
  local($/) = "\015\012";
  $self->_response($batch_size);
}

1;

package LogsFeeder::Redis;
use strict;
use re 'taint';
use warnings;

sub new {
  my($class, @redis_dsn) = @_;
  bless { redis_dsn => \@redis_dsn }, $class;
}

sub disconnect {
  my $self = $_[0];
  $self->{connected} = 0; undef $self->{redis};
}

sub on_connect {
  my($self, $r) = @_;
  my $db_id = $self->{db_id} || 0;
  eval {
    $r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
  } or do {
    if ($@ =~ /\bNOAUTH\b/) {
      defined $self->{password}
        or die "Redis server requires authentication, no password provided";
      $r->call('AUTH', $self->{password});
      $r->call('SELECT', $db_id);
    } else {
      chomp $@; die "Redis error: $@";
    }
  };
  $r->call('CLIENT', 'SETNAME', 'amavis['.$$.']');
  1;
}

sub connect {
  my $self = $_[0];
  $self->disconnect  if $self->{connected};
  $self->{redis} = $self->{db_id} = $self->{ttl} = undef;

  my($r, $err, $dsn, %options);
  my $dsn_list_ref = $self->{redis_dsn};
  for my $j (1 .. @$dsn_list_ref) {
    $dsn = $dsn_list_ref->[0];
    %options = ref $dsn eq 'HASH' ? %$dsn : ();
    # expiration time (time-to-live) is 16 days by default
    $self->{ttl} = exists $options{ttl} ? $options{ttl} : undef;
    $self->{db_id} = $options{db_id};
    if (defined $options{password}) {
      $self->{password} = $options{password};
      $options{password} = '(hidden)';  # for logging purposes
    }
    undef $err;
    eval {
      my %opt = %options; delete @opt{qw(ttl db_id password)};
      $r = LogsFeeder::TinyRedis->new(on_connect => sub { $self->on_connect(@_) }, %opt);
      $r or die "Error: $!";
    } or do {
      undef $r; $err = $@; chomp $err;
    };
    $self->{redis} = $r;
    last if $r;  # success, done
    if ($j < @$dsn_list_ref) {  # not all tried yet
      push(@$dsn_list_ref, shift @$dsn_list_ref);  # rotate left
    }
  }
  if (!$r) {
    $self->{redis} = $self->{db_id} = $self->{ttl} = undef;
    die sprintf("Can't connect to a redis server %s: %s\n",
                join(' ',%options), $err);
  }
  $self->{connected} = 1;
  $r;
}

sub DESTROY {
  my $self = $_[0]; local($@,$!,$_);
  eval { $self->{connected} = 0; undef $self->{redis} };
}

sub send {
  my($self, $str_ref, $log_key, $max_queue_len) = @_;
  $max_queue_len ||= 500000;

  $self->connect  if !$self->{connected};
  my $r = $self->{redis};
  $r->b_call("RPUSH", $log_key, $$str_ref);
  # keep most recent
  $r->b_call("LTRIM", $log_key, -$max_queue_len, -1)  if $max_queue_len;
  my $res = $r->b_results;  # errors will be signalled
  return $res->[0];  # current list length
}

sub receive {
  my($self, @log_keys) = @_;
  $self->connect  if !$self->{connected};
  my $r = $self->{redis};
  my $res;
  eval {
    $res = $r->call("BLPOP", @log_keys, 0); 1;
  } or do {
    chomp $@;
    do_log(1, "redis receive failed: %s", $@);
    undef $res;
  };
  return if !$res;
  return $res->[1];  # popped element
}

1;

Reply via email to