Repository: flume Updated Branches: refs/heads/trunk 5cfa8be41 -> 671fc3f1b
FLUME-3133 Add client IP / hostname headers to Syslog sources. In the newer version of the Syslog message format (RFC-5424) the hostname is not a mandatory header anymore so the Syslog client might not send it. On the Flume side it would be a useful information that could be used in interceptors or for event routing. To keep this information, two new properties have been added to the Syslog sources: clientIPHeader and clientHostnameHeader. Flume users can define custom event header names through these parameters for storing the IP address / hostname of the Syslog client in the Flume event as headers. The IP address / hostname are retrieved from the underlying network sockets, not from the Syslog message. This change is based on the patch submitted by Jinjiang Ling which has been rebased onto the current trunk and the review comments have been implemented. This closes #234 Reviewers: Ferenc Szabo, Endre Major (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/671fc3f1 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/671fc3f1 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/671fc3f1 Branch: refs/heads/trunk Commit: 671fc3f1b6d6b2239b171f582b189d0fbadedb6f Parents: 5cfa8be Author: Peter Turcsanyi <[email protected]> Authored: Wed Nov 14 13:25:06 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Wed Nov 14 13:25:06 2018 +0100 ---------------------------------------------------------------------- .../flume/source/MultiportSyslogTCPSource.java | 35 +++++++-- .../SyslogSourceConfigurationConstants.java | 4 + .../apache/flume/source/SyslogTcpSource.java | 47 ++++++++++-- .../apache/flume/source/SyslogUDPSource.java | 29 +++++++ .../org/apache/flume/source/SyslogUtils.java | 34 +++++++++ .../source/TestMultiportSyslogTCPSource.java | 80 +++++++++++++++----- .../flume/source/TestSyslogTcpSource.java | 49 ++++++++++++ .../flume/source/TestSyslogUdpSource.java | 47 +++++++++++- .../apache/flume/source/TestSyslogUtils.java | 58 ++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 68 +++++++++++++---- 10 files changed, 407 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java index d6abd37..9cd7de5 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java @@ -71,6 +71,8 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource impl private int batchSize; private int readBufferSize; private String portHeader; + private String clientIPHeader; + private String clientHostnameHeader; private SourceCounter sourceCounter = null; private Charset defaultCharset; private ThreadSafeDecoder defaultDecoder; @@ -141,7 +143,13 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource impl SyslogSourceConfigurationConstants.DEFAULT_BATCHSIZE); portHeader = context.getString( - SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER); + SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER); + + clientIPHeader = context.getString( + SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER); + + clientHostnameHeader = context.getString( + SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER); readBufferSize = context.getInteger( SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE, @@ -181,8 +189,8 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource impl acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize, - getChannelProcessor(), sourceCounter, portHeader, defaultDecoder, - portCharsets, keepFields)); + getChannelProcessor(), sourceCounter, portHeader, clientIPHeader, + clientHostnameHeader, defaultDecoder, portCharsets, keepFields)); for (int port : ports) { InetSocketAddress addr; @@ -237,6 +245,8 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource impl private final int batchSize; private final SourceCounter sourceCounter; private final String portHeader; + private final String clientIPHeader; + private final String clientHostnameHeader; private final SyslogParser syslogParser; private final LineSplitter lineSplitter; private final ThreadSafeDecoder defaultDecoder; @@ -245,14 +255,16 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource impl public MultiportSyslogHandler(int maxEventSize, int batchSize, ChannelProcessor cp, SourceCounter ctr, String portHeader, - ThreadSafeDecoder defaultDecoder, - ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets, - Set<String> keepFields) { + String clientIPHeader, String clientHostnameHeader, + ThreadSafeDecoder defaultDecoder, ConcurrentMap<Integer, + ThreadSafeDecoder> portCharsets, Set<String> keepFields) { channelProcessor = cp; sourceCounter = ctr; this.maxEventSize = maxEventSize; this.batchSize = batchSize; this.portHeader = portHeader; + this.clientIPHeader = clientIPHeader; + this.clientHostnameHeader = clientHostnameHeader; this.defaultDecoder = defaultDecoder; this.portCharsets = portCharsets; this.keepFields = keepFields; @@ -320,6 +332,17 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource impl if (portHeader != null) { event.getHeaders().put(portHeader, String.valueOf(port)); } + + if (clientIPHeader != null) { + event.getHeaders().put(clientIPHeader, + SyslogUtils.getIP(session.getRemoteAddress())); + } + + if (clientHostnameHeader != null) { + event.getHeaders().put(clientHostnameHeader, + SyslogUtils.getHostname(session.getRemoteAddress())); + } + events.add(event); } else { logger.trace("Parsed null event"); http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java index fb8df81..c88c957 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java @@ -61,6 +61,7 @@ public final class SyslogSourceConfigurationConstants { public static final String CONFIG_PORT_HEADER = "portHeader"; + @Deprecated public static final String DEFAULT_PORT_HEADER = "port"; public static final String CONFIG_READBUF_SIZE = "readBufferBytes"; @@ -74,6 +75,9 @@ public final class SyslogSourceConfigurationConstants { public static final String CONFIG_KEEP_FIELDS_TIMESTAMP = "timestamp"; public static final String CONFIG_KEEP_FIELDS_HOSTNAME = "hostname"; + public static final String CONFIG_CLIENT_IP_HEADER = "clientIPHeader"; + public static final String CONFIG_CLIENT_HOSTNAME_HEADER = "clientHostnameHeader"; + private SyslogSourceConfigurationConstants() { // Disable explicit creation of objects. } http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java index 067c21b..39aa5cb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java @@ -67,10 +67,14 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource private Map<String, String> formaterProp; private SourceCounter sourceCounter; private Set<String> keepFields; + private String clientIPHeader; + private String clientHostnameHeader; public class syslogTcpHandler extends SimpleChannelHandler { private SyslogUtils syslogUtils = new SyslogUtils(); + private String clientIPHeader; + private String clientHostnameHeader; public void setEventSize(int eventSize) { syslogUtils.setEventSize(eventSize); @@ -84,6 +88,14 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource syslogUtils.addFormats(prop); } + public void setClientIPHeader(String clientIPHeader) { + this.clientIPHeader = clientIPHeader; + } + + public void setClientHostnameHeader(String clientHostnameHeader) { + this.clientHostnameHeader = clientHostnameHeader; + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage(); @@ -94,6 +106,17 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource "rest of the event is received."); continue; } + + if (clientIPHeader != null) { + e.getHeaders().put(clientIPHeader, + SyslogUtils.getIP(ctx.getChannel().getRemoteAddress())); + } + + if (clientHostnameHeader != null) { + e.getHeaders().put(clientHostnameHeader, + SyslogUtils.getHostname(ctx.getChannel().getRemoteAddress())); + } + sourceCounter.incrementEventReceivedCount(); try { @@ -120,7 +143,8 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource ServerBootstrap serverBootstrap = new ServerBootstrap(factory); serverBootstrap.setPipelineFactory(new PipelineFactory( - eventSize, formaterProp, keepFields, getSslEngineSupplier(false) + eventSize, formaterProp, keepFields, clientIPHeader, clientHostnameHeader, + getSslEngineSupplier(false) )); logger.info("Syslog TCP Source starting..."); @@ -163,11 +187,15 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE); formaterProp = context.getSubProperties( - SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); + SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); keepFields = SyslogUtils.chooseFieldsToKeep( - context.getString( - SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, - SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)); + context.getString( + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, + SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)); + clientIPHeader = + context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER); + clientHostnameHeader = + context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); @@ -193,13 +221,18 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource private final Integer eventSize; private final Map<String, String> formaterProp; private final Set<String> keepFields; + private String clientIPHeader; + private String clientHostnameHeader; private Supplier<Optional<SSLEngine>> sslEngineSupplier; public PipelineFactory(Integer eventSize, Map<String, String> formaterProp, - Set<String> keepFields, Supplier<Optional<SSLEngine>> sslEngineSupplier) { + Set<String> keepFields, String clientIPHeader, String clientHostnameHeader, + Supplier<Optional<SSLEngine>> sslEngineSupplier) { this.eventSize = eventSize; this.formaterProp = formaterProp; this.keepFields = keepFields; + this.clientIPHeader = clientIPHeader; + this.clientHostnameHeader = clientHostnameHeader; this.sslEngineSupplier = sslEngineSupplier; } @@ -209,6 +242,8 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource handler.setEventSize(eventSize); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); + handler.setClientIPHeader(clientIPHeader); + handler.setClientHostnameHeader(clientHostnameHeader); ChannelPipeline pipeline = Channels.pipeline(handler); http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 1e47f34..fac067b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -57,6 +57,8 @@ public class SyslogUDPSource extends AbstractSource private Channel nettyChannel; private Map<String, String> formaterProp; private Set<String> keepFields; + private String clientIPHeader; + private String clientHostnameHeader; private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class); @@ -68,6 +70,8 @@ public class SyslogUDPSource extends AbstractSource public class syslogHandler extends SimpleChannelHandler { private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true); + private String clientIPHeader; + private String clientHostnameHeader; public void setFormater(Map<String, String> prop) { syslogUtils.addFormats(prop); @@ -77,6 +81,14 @@ public class SyslogUDPSource extends AbstractSource syslogUtils.setKeepFields(keepFields); } + public void setClientIPHeader(String clientIPHeader) { + this.clientIPHeader = clientIPHeader; + } + + public void setClientHostnameHeader(String clientHostnameHeader) { + this.clientHostnameHeader = clientHostnameHeader; + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { @@ -85,6 +97,17 @@ public class SyslogUDPSource extends AbstractSource if (e == null) { return; } + + if (clientIPHeader != null) { + e.getHeaders().put(clientIPHeader, + SyslogUtils.getIP(mEvent.getRemoteAddress())); + } + + if (clientHostnameHeader != null) { + e.getHeaders().put(clientHostnameHeader, + SyslogUtils.getHostname(mEvent.getRemoteAddress())); + } + sourceCounter.incrementEventReceivedCount(); getChannelProcessor().processEvent(e); @@ -109,6 +132,8 @@ public class SyslogUDPSource extends AbstractSource final syslogHandler handler = new syslogHandler(); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); + handler.setClientIPHeader(clientIPHeader); + handler.setClientHostnameHeader(clientHostnameHeader); serverBootstrap.setOption("receiveBufferSizePredictorFactory", new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, DEFAULT_INITIAL_SIZE, maxsize)); @@ -160,6 +185,10 @@ public class SyslogUDPSource extends AbstractSource context.getString( SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)); + clientIPHeader = + context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER); + clientHostnameHeader = + context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index 2df5ae0..032366d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -28,6 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Clock; @@ -186,6 +188,38 @@ public class SyslogUtils { return body; } + public static String getIP(SocketAddress socketAddress) { + try { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + String ip = inetSocketAddress.getAddress().getHostAddress(); + if (ip != null) { + return ip; + } else { + throw new NullPointerException("The returned IP is null"); + } + } catch (Exception e) { + logger.warn("Unable to retrieve client IP address", e); + } + // return a safe value instead of null + return ""; + } + + public static String getHostname(SocketAddress socketAddress) { + try { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + String hostname = inetSocketAddress.getHostName(); + if (hostname != null) { + return hostname; + } else { + throw new NullPointerException("The returned hostname is null"); + } + } catch (Exception e) { + logger.warn("Unable to retrieve client hostname", e); + } + // return a safe value instead of null + return ""; + } + public SyslogUtils() { this(false); } http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java index f132152..726c0b6 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java @@ -64,6 +64,8 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; import javax.net.SocketFactory; @@ -132,6 +134,7 @@ public class TestMultiportSyslogTCPSource { Context context = new Context(); context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS, ports.toString().trim()); + context.put("portHeader", "port"); context.putAll(additionalContext.getParameters()); source.configure(context); source.start(); @@ -257,10 +260,8 @@ public class TestMultiportSyslogTCPSource { Map<String, String> headers = e.getHeaders(); // rely on port to figure out which event it is Integer port = null; - if (headers.containsKey( - SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)) { - port = Integer.parseInt(headers.get( - SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)); + if (headers.containsKey("port")) { + port = Integer.parseInt(headers.get("port")); } iter.remove(); @@ -311,12 +312,10 @@ public class TestMultiportSyslogTCPSource { parsedLine.buffer.getString(Charsets.UTF_8.newDecoder())); parsedLine.buffer.rewind(); - MultiportSyslogTCPSource.MultiportSyslogHandler handler = - new MultiportSyslogTCPSource.MultiportSyslogHandler(maxLen, 100, null, - null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER, + MultiportSyslogHandler handler = new MultiportSyslogHandler( + maxLen, 100, null, null, null, null, null, new ThreadSafeDecoder(Charsets.UTF_8), - new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), - null); + new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),null); Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder()); String body = new String(event.getBody(), Charsets.UTF_8); @@ -340,10 +339,9 @@ public class TestMultiportSyslogTCPSource { // defaults to UTF-8 MultiportSyslogHandler handler = new MultiportSyslogHandler( 1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()), - new SourceCounter("test"), "port", + new SourceCounter("test"), null, null, null, new ThreadSafeDecoder(Charsets.UTF_8), - new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), - null); + new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),null); ParsedBuffer parsedBuf = new ParsedBuffer(); parsedBuf.incomplete = false; @@ -393,10 +391,9 @@ public class TestMultiportSyslogTCPSource { // defaults to UTF-8 MultiportSyslogHandler handler = new MultiportSyslogHandler( 1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()), - new SourceCounter("test"), "port", + new SourceCounter("test"), null, null, null, new ThreadSafeDecoder(Charsets.UTF_8), - new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), - null); + new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), null); handler.exceptionCaught(null, new RuntimeException("dummy")); SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter"); @@ -460,9 +457,8 @@ public class TestMultiportSyslogTCPSource { // defaults to UTF-8 MultiportSyslogHandler handler = new MultiportSyslogHandler( - 1000, 10, chanProc, new SourceCounter("test"), "port", - new ThreadSafeDecoder(Charsets.UTF_8), portCharsets, - null); + 1000, 10, chanProc, new SourceCounter("test"), null, null, null, + new ThreadSafeDecoder(Charsets.UTF_8), portCharsets, null); // initialize buffers handler.sessionCreated(session1); @@ -532,4 +528,52 @@ public class TestMultiportSyslogTCPSource { source.stop(); } + @Test + public void testClientHeaders() throws IOException { + String testClientIPHeader = "testClientIPHeader"; + String testClientHostnameHeader = "testClientHostnameHeader"; + + MultiportSyslogTCPSource source = new MultiportSyslogTCPSource(); + Channel channel = new MemoryChannel(); + + Configurables.configure(channel, new Context()); + + List<Channel> channels = Lists.newArrayList(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + int port = getFreePort(); + Context context = new Context(); + context.put("host", InetAddress.getLoopbackAddress().getHostAddress()); + context.put("ports", String.valueOf(port)); + context.put("clientIPHeader", testClientIPHeader); + context.put("clientHostnameHeader", testClientHostnameHeader); + + source.configure(context); + source.start(); + + //create a socket to send a test event + Socket syslogSocket = new Socket(InetAddress.getLoopbackAddress().getHostAddress(), port); + syslogSocket.getOutputStream().write(getEvent(0)); + + Event e = takeEvent(channel); + + source.stop(); + + Map<String, String> headers = e.getHeaders(); + + checkHeader(headers, testClientIPHeader, InetAddress.getLoopbackAddress().getHostAddress()); + checkHeader(headers, testClientHostnameHeader, InetAddress.getLoopbackAddress().getHostName()); + } + + private static void checkHeader(Map<String, String> headers, String headerName, + String expectedValue) { + assertTrue("Missing event header: " + headerName, headers.containsKey(headerName)); + assertEquals("Event header value does not match: " + headerName, + expectedValue, headers.get(headerName)); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java index 9398707..057aef5 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java @@ -37,13 +37,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; @@ -86,6 +89,8 @@ public class TestSyslogTcpSource { rcs.setChannels(channels); source.setChannelProcessor(new ChannelProcessor(rcs)); + + context.put("host", InetAddress.getLoopbackAddress().getHostAddress()); context.put("port", String.valueOf(TEST_SYSLOG_PORT)); context.put("keepFields", keepFields); @@ -264,5 +269,49 @@ public class TestSyslogTcpSource { } + + @Test + public void testClientHeaders() throws IOException { + String testClientIPHeader = "testClientIPHeader"; + String testClientHostnameHeader = "testClientHostnameHeader"; + + Context context = new Context(); + context.put("clientIPHeader", testClientIPHeader); + context.put("clientHostnameHeader", testClientHostnameHeader); + + init("none", context); + + source.start(); + // Write some message to the syslog port + InetSocketAddress addr = source.getBoundAddress(); + Socket syslogSocket = new Socket(addr.getAddress(), addr.getPort()); + syslogSocket.getOutputStream().write(bodyWithTandH.getBytes()); + + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + + try { + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + + source.stop(); + + Map<String, String> headers = e.getHeaders(); + + checkHeader(headers, testClientIPHeader, InetAddress.getLoopbackAddress().getHostAddress()); + checkHeader(headers, testClientHostnameHeader, InetAddress.getLoopbackAddress().getHostName()); + } + + private static void checkHeader(Map<String, String> headers, String headerName, + String expectedValue) { + assertTrue("Missing event header: " + headerName, headers.containsKey(headerName)); + assertEquals("Event header value does not match: " + headerName, + expectedValue, headers.get(headerName)); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java index 76c5759..a96140a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java @@ -42,7 +42,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; @@ -64,6 +67,10 @@ public class TestSyslogUdpSource { data1; private void init(String keepFields) { + init(keepFields, new Context()); + } + + private void init(String keepFields, Context context) { source = new SyslogUDPSource(); channel = new MemoryChannel(); @@ -76,7 +83,7 @@ public class TestSyslogUdpSource { rcs.setChannels(channels); source.setChannelProcessor(new ChannelProcessor(rcs)); - Context context = new Context(); + context.put("host", InetAddress.getLoopbackAddress().getHostAddress()); context.put("port", String.valueOf(TEST_SYSLOG_PORT)); context.put("keepFields", keepFields); @@ -266,5 +273,43 @@ public class TestSyslogUdpSource { } return payload.toString(); } + + @Test + public void testClientHeaders() throws IOException { + String testClientIPHeader = "testClientIPHeader"; + String testClientHostnameHeader = "testClientHostnameHeader"; + + + Context context = new Context(); + context.put("clientIPHeader", testClientIPHeader); + context.put("clientHostnameHeader", testClientHostnameHeader); + + init("none", context); + + source.start(); + + DatagramPacket datagramPacket = createDatagramPacket(bodyWithTandH.getBytes()); + sendDatagramPacket(datagramPacket); + + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + + commitAndCloseTransaction(txn); + + source.stop(); + + Map<String, String> headers = e.getHeaders(); + + checkHeader(headers, testClientIPHeader, InetAddress.getLoopbackAddress().getHostAddress()); + checkHeader(headers, testClientHostnameHeader, InetAddress.getLoopbackAddress().getHostName()); + } + + private static void checkHeader(Map<String, String> headers, String headerName, + String expectedValue) { + assertTrue("Missing event header: " + headerName, headers.containsKey(headerName)); + assertEquals("Event header value does not match: " + headerName, + expectedValue, headers.get(headerName)); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java index 2479413..851290d 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java @@ -18,12 +18,16 @@ */ package org.apache.flume.source; +import static org.junit.Assert.assertEquals; + import org.apache.flume.Event; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.junit.Assert; import org.junit.Test; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Clock; @@ -596,4 +600,58 @@ public class TestSyslogUtils { checkHeader("true", msg1, stamp1 + "+0800", format1, host1, data5); } + @Test + public void testGetIPWhenSuccessful() { + SocketAddress socketAddress = new InetSocketAddress("localhost", 2000); + + String ip = SyslogUtils.getIP(socketAddress); + + assertEquals("127.0.0.1", ip); + } + + @Test + public void testGetIPWhenInputIsNull() { + SocketAddress socketAddress = null; + + String ip = SyslogUtils.getIP(socketAddress); + + assertEquals("", ip); + } + + @Test + public void testGetIPWhenInputIsNotInetSocketAddress() { + SocketAddress socketAddress = new SocketAddress() {}; + + String ip = SyslogUtils.getIP(socketAddress); + + assertEquals("", ip); + } + + @Test + public void testGetHostnameWhenSuccessful() { + SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 2000); + + String hostname = SyslogUtils.getHostname(socketAddress); + + assertEquals("localhost", hostname); + } + + @Test + public void testGetHostnameWhenInputIsNull() { + SocketAddress socketAddress = null; + + String hostname = SyslogUtils.getHostname(socketAddress); + + assertEquals("", hostname); + } + + @Test + public void testGetHostnameWhenInputIsNotInetSocketAddress() { + SocketAddress socketAddress = new SocketAddress() {}; + + String hostname = SyslogUtils.getHostname(socketAddress); + + assertEquals("", hostname); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 1f244c5..6939b59 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1809,6 +1809,20 @@ keepFields none Setting this to 'all' will preserve the Prior fields can be included: priority, version, timestamp, hostname. The values 'true' and 'false' have been deprecated in favor of 'all' and 'none'. +clientIPHeader -- If specified, the IP address of the client will be stored in + the header of each event using the header name specified here. + This allows for interceptors and channel selectors to customize + routing logic based on the IP address of the client. + Do not use the standard Syslog header names here (like _host_) + because the event header will be overridden in that case. +clientHostnameHeader -- If specified, the host name of the client will be stored in + the header of each event using the header name specified here. + This allows for interceptors and channel selectors to customize + routing logic based on the host name of the client. + Retrieving the host name may involve a name service reverse + lookup which may affect the performance. + Do not use the standard Syslog header names here (like _host_) + because the event header will be overridden in that case. selector.type replicating or multiplexing selector.* replicating Depends on the selector.type value interceptors -- Space-separated list of interceptors @@ -1875,6 +1889,20 @@ keepFields none Setting this to 'all' will preserve the timestamp, hostname. The values 'true' and 'false' have been deprecated in favor of 'all' and 'none'. portHeader -- If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. +clientIPHeader -- If specified, the IP address of the client will be stored in + the header of each event using the header name specified here. + This allows for interceptors and channel selectors to customize + routing logic based on the IP address of the client. + Do not use the standard Syslog header names here (like _host_) + because the event header will be overridden in that case. +clientHostnameHeader -- If specified, the host name of the client will be stored in + the header of each event using the header name specified here. + This allows for interceptors and channel selectors to customize + routing logic based on the host name of the client. + Retrieving the host name may involve a name service reverse + lookup which may affect the performance. + Do not use the standard Syslog header names here (like _host_) + because the event header will be overridden in that case. charset.default UTF-8 Default character set used while parsing syslog events into strings. charset.port.<port> -- Character set is configurable on a per-port basis. batchSize 100 Maximum number of events to attempt to process per request loop. Using the default is usually fine. @@ -1923,20 +1951,34 @@ For example, a multiport syslog TCP source for agent named a1: Syslog UDP Source ''''''''''''''''' -============== =========== ============================================== -Property Name Default Description -============== =========== ============================================== -**channels** -- -**type** -- The component type name, needs to be ``syslogudp`` -**host** -- Host name or IP address to bind to -**port** -- Port # to bind to -keepFields false Setting this to true will preserve the Priority, - Timestamp and Hostname in the body of the event. -selector.type replicating or multiplexing -selector.* replicating Depends on the selector.type value -interceptors -- Space-separated list of interceptors +==================== =========== ================================================================ +Property Name Default Description +==================== =========== ================================================================ +**channels** -- +**type** -- The component type name, needs to be ``syslogudp`` +**host** -- Host name or IP address to bind to +**port** -- Port # to bind to +keepFields false Setting this to true will preserve the Priority, + Timestamp and Hostname in the body of the event. +clientIPHeader -- If specified, the IP address of the client will be stored in + the header of each event using the header name specified here. + This allows for interceptors and channel selectors to customize + routing logic based on the IP address of the client. + Do not use the standard Syslog header names here (like _host_) + because the event header will be overridden in that case. +clientHostnameHeader -- If specified, the host name of the client will be stored in + the header of each event using the header name specified here. + This allows for interceptors and channel selectors to customize + routing logic based on the host name of the client. + Retrieving the host name may involve a name service reverse + lookup which may affect the performance. + Do not use the standard Syslog header names here (like _host_) + because the event header will be overridden in that case. +selector.type replicating or multiplexing +selector.* replicating Depends on the selector.type value +interceptors -- Space-separated list of interceptors interceptors.* -============== =========== ============================================== +==================== =========== ================================================================= For example, a syslog UDP source for agent named a1:
