V Paul created FLUME-2249:
-----------------------------

             Summary: Interceptors is failing to modify/drop events with log4j 
appender and avro source.
                 Key: FLUME-2249
                 URL: https://issues.apache.org/jira/browse/FLUME-2249
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.4.0, v1.3.0
         Environment: Linux Mint 14 Nadia (based on Ubuntu 12.10, Quantal 
Quetzal)
apache-flume-1.4.0
apache-flume-1.3.0
log4j-1.2.16
flume-ng-log4jappender-1.4.0-jar-with-dependencies
            Reporter: V Paul


tested with Regex Filtering Interceptor, Static Interceptor. There is a bug for 
Regex Extractor Interceptor - FLUME-2041.

h5. Configuration:
h6. log4j.properties:
{noformat}
# Root logger option
log4j.rootLogger=ALL, stdout
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
#
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = localhost
log4j.appender.flume.Port = 41414

# configure a class's logger to output to the flume appender
log4j.logger.generator.LogGenerator = INFO,flume

log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
{noformat}
h6. secondExample.conf:
{noformat}
# Flume test file
# Listens via Avro RPC on port 41414 and dumps data received to the log

agent.channels = ch-1
agent.sources = src-1
agent.sinks = sink-1

agent.channels.ch-1.type = memory
agent.channels.ch-1.capacity = 10000000
agent.channels.ch-1.transactionCapacity = 1000

agent.sources.src-1.type = avro
agent.sources.src-1.channels = ch-1
agent.sources.src-1.bind = localhost
agent.sources.src-1.port = 41414

agent.sources.src-1.interceptors = intrcptr
agent.sources.src-1.interceptors.intrcptr.type = regex_filter
agent.sources.src-1.interceptors.intrcptr.regex = ERROR [0-4]:

agent.sinks.sink-1.type = logger
agent.sinks.sink-1.channel = ch-1
{noformat}
start command:
{noformat}
bin/flume-ng agent --conf conf --conf-file secondExample.conf --name agent 
-Dflume.root.logger=ALL,console
{noformat}
----
h5. Result:
Logs seems to be OK:
{noformat}
SOURCES: {src-1={ parameters:{port=41414, interceptors.intrcptr.regex=ERROR 
[0-4]:, interceptors=intrcptr, channels=ch-1, type=avro, bind=localhost, 
interceptors.intrcptr.type=regex_filter} }}
CHANNELS: {ch-1={ parameters:{transactionCapacity=1000, capacity=10000000, 
type=memory} }}
AgentConfiguration created with Configuration stubs for which full validation 
was performed[agent]
SINKS: {sink-1=ComponentConfiguration[sink-1]
  CONFIG: 
    CHANNEL:ch-1
}

2013-11-22 23:41:48,689 (conf-file-poller-0) [DEBUG - 
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)]
 Channels:ch-1

2013-11-22 23:41:48,689 (conf-file-poller-0) [DEBUG - 
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)]
 Sinks sink-1

2013-11-22 23:41:48,690 (conf-file-poller-0) [DEBUG - 
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)]
 Sources src-1

2013-11-22 23:41:48,690 (conf-file-poller-0) [INFO - 
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)]
 Post-validation flume configuration contains configuration for agents: [agent]
2013-11-22 23:41:48,693 (conf-file-poller-0) [INFO - 
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)]
 Creating channels
2013-11-22 23:41:48,706 (conf-file-poller-0) [INFO - 
org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)]
 Creating instance of channel ch-1 type memory
2013-11-22 23:41:48,717 (conf-file-poller-0) [INFO - 
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)]
 Created channel ch-1
2013-11-22 23:41:48,725 (conf-file-poller-0) [INFO - 
org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)]
 Creating instance of source src-1, type avro
2013-11-22 23:41:48,779 (conf-file-poller-0) [INFO - 
org.apache.flume.interceptor.RegexFilteringInterceptor$Builder.build(RegexFilteringInterceptor.java:160)]
 Creating RegexFilteringInterceptor: regex=ERROR [0-4]:,excludeEvents=false
2013-11-22 23:41:48,782 (conf-file-poller-0) [INFO - 
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] 
Creating instance of sink: sink-1, type: logger
2013-11-22 23:41:48,787 (conf-file-poller-0) [INFO - 
org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)]
 Channel ch-1 connected to [src-1, sink-1]
2013-11-22 23:41:48,794 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:138)] 
Starting new configuration:{ sourceRunners:{src-1=EventDrivenSourceRunner: { 
source:Avro source src-1: { bindAddress: localhost, port: 41414 } }} 
sinkRunners:{sink-1=SinkRunner: { 
policy:org.apache.flume.sink.DefaultSinkProcessor@12679f2 counterGroup:{ 
name:null counters:{} } }} 
channels:{ch-1=org.apache.flume.channel.MemoryChannel{name: ch-1}} }
2013-11-22 23:41:48,795 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:145)] 
Starting Channel ch-1
2013-11-22 23:41:48,881 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)]
 Monitoried counter group for type: CHANNEL, name: ch-1, registered 
successfully.
2013-11-22 23:41:48,882 (lifecycleSupervisor-1-0) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)]
 Component type: CHANNEL, name: ch-1 started
2013-11-22 23:41:48,882 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:173)] 
Starting Sink sink-1
2013-11-22 23:41:48,884 (conf-file-poller-0) [INFO - 
org.apache.flume.node.Application.startAllComponents(Application.java:184)] 
Starting Source src-1
2013-11-22 23:41:48,885 (lifecycleSupervisor-1-3) [INFO - 
org.apache.flume.source.AvroSource.start(AvroSource.java:192)] Starting Avro 
source src-1: { bindAddress: localhost, port: 41414 }...
2013-11-22 23:41:48,922 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG 
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling 
sink runner starting
2013-11-22 23:41:49,442 (lifecycleSupervisor-1-3) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)]
 Monitoried counter group for type: SOURCE, name: src-1, registered 
successfully.
2013-11-22 23:41:49,448 (lifecycleSupervisor-1-3) [INFO - 
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)]
 Component type: SOURCE, name: src-1 started
2013-11-22 23:41:49,450 (lifecycleSupervisor-1-3) [INFO - 
org.apache.flume.source.AvroSource.start(AvroSource.java:217)] Avro source 
src-1 started.
{noformat}
So it is expected that events containing text from "ERROR 0:" to "ERROR 4:" 
should be passed to the sink, and all other events should be dropped. But _no 
events are dropped_ :
{noformat}
2013-11-22 23:56:07,044 (pool-3-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
 [id: 0x00297e0d, /127.0.0.1:40355 => /127.0.0.1:41414] OPEN
2013-11-22 23:56:07,048 (pool-4-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
 [id: 0x00297e0d, /127.0.0.1:40355 => /127.0.0.1:41414] BOUND: /127.0.0.1:41414
2013-11-22 23:56:07,048 (pool-4-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
 [id: 0x00297e0d, /127.0.0.1:40355 => /127.0.0.1:41414] CONNECTED: 
/127.0.0.1:40355
2013-11-22 23:56:07,640 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"40000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367544"}, "body": {"bytes": "2013-11-22 23:56:07 ERROR LogGenerator:33 
- ERROR 4: 3735f28e-afee-48df-b60d-b3dbe9c0956f
"}}
2013-11-22 23:56:07,713 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"40000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367712"}, "body": {"bytes": "2013-11-22 23:56:07 ERROR LogGenerator:33 
- ERROR 3: f0e9cba1-095a-428e-bdab-1d7022dd449c
"}}
2013-11-22 23:56:07,719 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"30000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367717"}, "body": {"bytes": "2013-11-22 23:56:07 WARN  LogGenerator:33 
- ERROR 7: fde1ffce-6668-4364-a68e-afd015865be9
"}}
2013-11-22 23:56:07,739 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"20000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367730"}, "body": {"bytes": "2013-11-22 23:56:07 INFO  LogGenerator:33 
- ERROR 2: 3ea3b3ec-c23a-4dc2-b8d1-3643c46ac15e
"}}
2013-11-22 23:56:07,742 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"30000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367741"}, "body": {"bytes": "2013-11-22 23:56:07 WARN  LogGenerator:33 
- ERROR 9: 8682e837-60d0-4c81-9c1b-970ac23af700
"}}
2013-11-22 23:56:07,752 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"30000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367751"}, "body": {"bytes": "2013-11-22 23:56:07 WARN  LogGenerator:33 
- ERROR 4: a089b140-8f81-45c2-88d7-435f34f8c681
"}}
2013-11-22 23:56:07,754 (pool-4-thread-1) [DEBUG - 
org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source 
src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": 
"20000", "flume.client.log4j.logger.name": "generator.LogGenerator", 
"flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": 
"1385157367753"}, "body": {"bytes": "2013-11-22 23:56:07 INFO  LogGenerator:33 
- ERROR 7: 57c35064-a6b1-4a9e-afc1-166091ffb68e
"}}
2013-11-22 23:56:07,791 (pool-4-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
 [id: 0x00297e0d, /127.0.0.1:40355 :> /127.0.0.1:41414] DISCONNECTED
2013-11-22 23:56:07,792 (pool-4-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
 [id: 0x00297e0d, /127.0.0.1:40355 :> /127.0.0.1:41414] UNBOUND
2013-11-22 23:56:07,792 (pool-4-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
 [id: 0x00297e0d, /127.0.0.1:40355 :> /127.0.0.1:41414] CLOSED
2013-11-22 23:56:07,792 (pool-4-thread-1) [INFO - 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)]
 Connection to /127.0.0.1:40355 disconnected.
{noformat}
----
*Please note* that this bug is reproducible if Flume is configured like this:
bq. Log4jAppender -> Avro source -> Regex interceptor -> Logger Sink
As it was mentioned in FLUME-2041 it works as expected if Flume is configured 
with two agents like this:
bq. Log4jAppender -> Avro source -> Avro sink -> Avro source -> Regex 
interceptor  -> Logger Sink



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to