Earlier this year I said I'd feed back how my IPA to Rsyslog to Logstash
experiments went.

They went badly.  And I didn't get much time.  Today, however, I managed
to get over my imaginary finishing line:

All systems are RHEL 6.6.

Rsyslog (rsyslog7-7.4.10) is configured to import logs from some dirsrv

# cat /etc/rsyslog.d/dirsrv.conf 
module(load="imfile" PollingInterval="2")




This pulls in those log entries on a regular basis.  Rsyslog8 allows you
to use inotify for file changes, but that's not available to me.

Rsyslog is then also configured to push all logs to my Logstash servers:

# cat /etc/rsyslog.d/logstash.conf 
template(name="ls_json" type="list" option.json="on")
{ constant(value="{")
constant(value="\"@timestamp\":\"") property(name="timegenerated"
constant(value="\",\"message\":\"") property(name="msg")
constant(value="\",\"host\":\"") property(name="hostname")
constant(value="\",\"logsource\":\"") property(name="fromhost")
constant(value="\",\"severity\":\"") property(name="syslogseverity")
constant(value="\",\"facility\":\"") property(name="syslogfacility")
constant(value="\",\"program\":\"") property(name="programname")
constant(value="\",\"pid\":\"") property(name="procid")
constant(value="\",\"rawmsg\":\"") property(name="rawmsg")
constant(value="\",\"syslogtag\":\"") property(name="syslogtag")

*.* @@logstash01.example.com:5500;ls_json
$ActionExecOnlyWhenPreviousIsSuspended on
& @@logstash02.example.com:5500;ls_json
& /var/log/localbuffer
$ActionExecOnlyWhenPreviousIsSuspended off

[root@lvdlvldap02 ~]#

Which pushes all logs to my logstash servers in JSON format.  Failover
is built in by using 2 logstash servers.
The client needs to have SELinux managed to allow rsyslog to write to
port 5500:

# semanage port -a -t syslogd_port_t -p tcp 5500
# semanage port -l | grep 5500

The Logstash servers are then configured to listen on this port and do
some simple groking, before sending everything to the ElasticSearch

# cat /etc/logstash/conf.d/syslog.conf 
input {
  tcp {
    type => syslogjson
    port => 5500
    codec => "json"

filter {
  # This replaces the host field (UDP source) with the host that
generated the message (sysloghost)
  if [sysloghost] {
    mutate {
      replace => [ "host", "%{sysloghost}" ]
      remove_field => "sysloghost" # prune the field after successfully
replacing "host"
  if [type] == "syslogjson" {
    grok {
      patterns_dir => "/opt/logstash/patterns"
      match => { "message" => "%{VIRGINFW}" }
      match => { "message" => "%{AUDITAVC}" }
      match => { "message" => "%{COMMONAPACHELOG}" }
      tag_on_failure => []

  # This filter populates the @timestamp field with the timestamp that's
in the actual message
  # dirsrv logs are currently pulled in every 2 minutes, so @timestamp
is wrong
  if [syslogtag] == "dirsrv" {
    mutate {
      remove_field => [ 'rawmsg' ]
    grok {
      match => [ "message", "%{HTTPDATE:log_timestamp}" ]
    date {
      match => [ "log_timestamp", "dd/MMM/YYY:HH:mm:ss Z"]
      locale => "en"
      remove_field => [ "log_timestamp" ]

output {
  elasticsearch {
    protocol => node
    node_name => "Indexer01"

It works well for the most part.  I'm not performing any groking of the
actual message line as yet to pull out various bits of data into their
own separate fields, but at least I'm managing to log the access and
errors from multiple IPA servers.

The @timestamp field ends up with the timestamp from the actual message
line, so it's only down to second accuracy.  This means that multiple
log lines on the same second lose their ordering when viewed in the
Logstash/Kibana interface.  But the important thing at this point is
that they're now held centrally.

Is it feasible to alter the timestamp resolution that dirsrv uses?  This
would help separate log lines properly.

Cheers & Merry Festive Holiday thing


