Repository: flume Updated Branches: refs/heads/trunk 72a68f0d4 -> de941e7b8
FLUME-2171. Add Interceptor to remove headers from event Similar to Flume OG decorators, this introduces an Interceptor to remove headers from an event. Reviewers: Bessenyei Balázs Donát, Jeff Holoman, Denes Arvay, Attila Simon (Gabriel Commeau and Bessenyei Balázs Donát via Jeff Holoman) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/de941e7b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/de941e7b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/de941e7b Branch: refs/heads/trunk Commit: de941e7b85ed55bfbd804880f421be3889187d31 Parents: 72a68f0 Author: Gabriel Commeau <[email protected]> Authored: Sun Nov 20 17:34:48 2016 -0500 Committer: jholoman <[email protected]> Committed: Sun Nov 20 17:39:07 2016 -0500 ---------------------------------------------------------------------- .../flume/interceptor/InterceptorType.java | 3 +- .../interceptor/RemoveHeaderInterceptor.java | 182 ++++++++++++++ .../RemoveHeaderInterceptorTest.java | 249 +++++++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 19 ++ 4 files changed, 452 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/de941e7b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java index fe341e9..86a0798 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java @@ -26,11 +26,12 @@ public enum InterceptorType { REGEX_FILTER( org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class), REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class), + REMOVE_HEADER(org.apache.flume.interceptor.RemoveHeaderInterceptor.Builder.class), SEARCH_REPLACE(org.apache.flume.interceptor.SearchAndReplaceInterceptor.Builder.class); private final Class<? extends Interceptor.Builder> builderClass; - private InterceptorType(Class<? extends Interceptor.Builder> builderClass) { + InterceptorType(Class<? extends Interceptor.Builder> builderClass) { this.builderClass = builderClass; } http://git-wip-us.apache.org/repos/asf/flume/blob/de941e7b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RemoveHeaderInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RemoveHeaderInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RemoveHeaderInterceptor.java new file mode 100644 index 0000000..7e6f80b --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RemoveHeaderInterceptor.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.interceptor; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.LogPrivacyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This interceptor manipulates Flume event headers, by removing one or many + * headers. It can remove a statically defined header, headers based on a + * regular expression or headers in a list. If none of these is defined, or if + * no header matches the criteria, the Flume events are not modified.<br /> + * Note that if only one header needs to be removed, specifying it by name + * provides performance benefits over the other two methods.<br /> + * <br /> + * Properties:<br /> + * - .withName (optional): name of the header to remove<br /> + * - .fromList (optional): list of headers to remove, separated with the + * separator specified with .from.list.separator<br /> + * - .fromListSeparator (optional): regular expression used to separate + * multiple header names in the list specified using .from.list<br /> + * - .matching (optional): All the headers which names match this regular expression are + * removed + */ +public class RemoveHeaderInterceptor implements Interceptor { + static final String WITH_NAME = "withName"; + static final String FROM_LIST = "fromList"; + static final String LIST_SEPARATOR = "fromListSeparator"; + static final String LIST_SEPARATOR_DEFAULT = "\\s*,\\s*"; + static final String MATCH_REGEX = "matching"; + private static final Logger LOG = LoggerFactory + .getLogger(RemoveHeaderInterceptor.class); + private final String withName; + private final Set<String> fromList; + private final Pattern matchRegex; + + /** + * Only {@link RemoveHeaderInterceptor.Builder} can build me + */ + private RemoveHeaderInterceptor(final String withName, final String fromList, + final String listSeparator, final Pattern matchRegex) { + this.withName = withName; + assert listSeparator != null : "Default value used otherwise"; + this.fromList = (fromList != null) ? new HashSet<>(Arrays.asList(fromList.split( + listSeparator))) : null; + this.matchRegex = matchRegex; + } + + /** + * @see org.apache.flume.interceptor.Interceptor#initialize() + */ + @Override + public void initialize() { + // Nothing to do + } + + /** + * @see org.apache.flume.interceptor.Interceptor#close() + */ + @Override + public void close() { + // Nothing to do + } + + /** + * @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List) + */ + @Override + public List<Event> intercept(final List<Event> events) { + for (final Event event : events) { + intercept(event); + } + return events; + } + + /** + * @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event) + */ + @Override + public Event intercept(final Event event) { + assert event != null : "Missing Flume event while intercepting"; + try { + final Map<String, String> headers = event.getHeaders(); + // If withName matches, removing it directly + if (withName != null && headers.remove(withName) != null) { + LOG.trace("Removed header \"{}\" for event: {}", withName, event); + } + // Also, we need to go through the list + if (fromList != null || matchRegex != null) { + final Iterator<String> headerIterator = headers.keySet().iterator(); + List<String> removedHeaders = new LinkedList<>(); + while (headerIterator.hasNext()) { + final String currentHeader = headerIterator.next(); + if (fromList != null && fromList.contains(currentHeader)) { + headerIterator.remove(); + removedHeaders.add(currentHeader); + } else if (matchRegex != null) { + final Matcher matcher = matchRegex.matcher(currentHeader); + if (matcher.matches()) { + headerIterator.remove(); + removedHeaders.add(currentHeader); + } + } + } + if (!removedHeaders.isEmpty() && LogPrivacyUtil.allowLogRawData()) { + LOG.trace("Removed headers \"{}\" for event: {}", removedHeaders, event); + } + } + } catch (final Exception e) { + LOG.error("Failed to process event " + event, e); + } + return event; + } + + /** + * Builder which builds new instances of the {@link RemoveHeaderInterceptor}. + */ + public static class Builder implements Interceptor.Builder { + String withName; + String fromList; + String listSeparator; + Pattern matchRegex; + + /** + * @see org.apache.flume.interceptor.Interceptor.Builder#build() + */ + @Override + public Interceptor build() { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating RemoveHeaderInterceptor with: withName={}, fromList={}, " + + "listSeparator={}, matchRegex={}", new String[] {withName, fromList, listSeparator, + String.valueOf(matchRegex)}); + } + return new RemoveHeaderInterceptor(withName, fromList, listSeparator, + matchRegex); + } + + /** + * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context) + */ + @Override + public void configure(final Context context) { + withName = context.getString(WITH_NAME); + fromList = context.getString(FROM_LIST); + listSeparator = context.getString(LIST_SEPARATOR, + LIST_SEPARATOR_DEFAULT); + final String matchRegexStr = context.getString(MATCH_REGEX); + if (matchRegexStr != null) { + matchRegex = Pattern.compile(matchRegexStr); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/de941e7b/flume-ng-core/src/test/java/org/apache/flume/interceptor/RemoveHeaderInterceptorTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/RemoveHeaderInterceptorTest.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/RemoveHeaderInterceptorTest.java new file mode 100644 index 0000000..219ca2d --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/RemoveHeaderInterceptorTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.interceptor; + +import com.google.common.collect.ImmutableMap; +import junit.framework.Assert; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.PatternSyntaxException; + +public class RemoveHeaderInterceptorTest { + private static final String HEADER1 = "my-header10"; + private static final String HEADER2 = "my-header11"; + private static final String HEADER3 = "my-header12"; + private static final String HEADER4 = "my-header20"; + private static final String HEADER5 = "my-header21"; + private static final String DEFAULT_SEPARATOR = ", "; + private static final String MY_SEPARATOR = ";"; + + private Event buildEventWithHeader() { + return EventBuilder.withBody("My test event".getBytes(), ImmutableMap.of( + HEADER1, HEADER1, HEADER2, HEADER2, HEADER3, HEADER3, HEADER4, HEADER4, + HEADER5, HEADER5)); + } + + private Event buildEventWithoutHeader() { + return EventBuilder.withBody("My test event".getBytes()); + } + + @Test(expected = PatternSyntaxException.class) + public void testBadConfig() throws Exception { + new RemoveHeaderIntBuilder().fromList(HEADER1, "(").build(); + } + + @Test + public void testWithName() throws IllegalAccessException, ClassNotFoundException, + InstantiationException { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .withName(HEADER4).build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertNull(event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + @Test + public void testFromListWithDefaultSeparator1() throws Exception { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .fromList(HEADER4 + MY_SEPARATOR + HEADER2).build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + @Test + public void testFromListWithDefaultSeparator2() throws Exception { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .fromList(HEADER4 + DEFAULT_SEPARATOR + HEADER2).build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertNull(event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertNull(event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + @Test + public void testFromListWithCustomSeparator1() throws Exception { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .fromList(HEADER4 + MY_SEPARATOR + HEADER2, MY_SEPARATOR).build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertNull(event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertNull(event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + @Test + public void testFromListWithCustomSeparator2() throws Exception { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .fromList(HEADER4 + DEFAULT_SEPARATOR + HEADER2, MY_SEPARATOR).build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + @Test + public void testMatchRegex() throws Exception { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .matchRegex("my-header1.*").build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertNull(event1.getHeaders().get(HEADER1)); + Assert.assertNull(event1.getHeaders().get(HEADER2)); + Assert.assertNull(event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + @Test + public void testAll() throws Exception { + final Interceptor removeHeaderInterceptor = new RemoveHeaderIntBuilder() + .matchRegex("my-header2.*") + .fromList(HEADER1 + MY_SEPARATOR + HEADER3, MY_SEPARATOR) + .withName(HEADER2).build(); + final Event event1 = buildEventWithHeader(); + Assert.assertEquals(HEADER1, event1.getHeaders().get(HEADER1)); + Assert.assertEquals(HEADER2, event1.getHeaders().get(HEADER2)); + Assert.assertEquals(HEADER3, event1.getHeaders().get(HEADER3)); + Assert.assertEquals(HEADER4, event1.getHeaders().get(HEADER4)); + Assert.assertEquals(HEADER5, event1.getHeaders().get(HEADER5)); + removeHeaderInterceptor.intercept(event1); + Assert.assertTrue(event1.getHeaders().isEmpty()); + + final Event event2 = buildEventWithoutHeader(); + Assert.assertTrue(event2.getHeaders().isEmpty()); + removeHeaderInterceptor.intercept(event2); + Assert.assertTrue(event2.getHeaders().isEmpty()); + } + + private static class RemoveHeaderIntBuilder { + final Map<String, String> contextMap = new HashMap<>(); + + RemoveHeaderIntBuilder withName(final String str) { + contextMap.put(RemoveHeaderInterceptor.WITH_NAME, str); + return this; + } + + RemoveHeaderIntBuilder fromList(final String str) { + contextMap.put(RemoveHeaderInterceptor.FROM_LIST, str); + return this; + } + + RemoveHeaderIntBuilder fromList(final String str, + final String separator) { + fromList(str); + contextMap.put(RemoveHeaderInterceptor.LIST_SEPARATOR, separator); + return this; + } + + RemoveHeaderIntBuilder matchRegex(final String str) { + contextMap.put(RemoveHeaderInterceptor.MATCH_REGEX, str); + return this; + } + + public Interceptor build() throws InstantiationException, IllegalAccessException, + ClassNotFoundException { + Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( + InterceptorType.REMOVE_HEADER.toString()); + builder.configure(new Context(contextMap)); + return builder.build(); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/de941e7b/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 07f84db..c73496c 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -3906,6 +3906,25 @@ Example for agent named a1: a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK + +Remove Header Interceptor +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This interceptor manipulates Flume event headers, by removing one or many headers. It can remove a statically defined header, headers based on a regular expression or headers in a list. If none of these is defined, or if no header matches the criteria, the Flume events are not modified. + +Note that if only one header needs to be removed, specifying it by name provides performance benefits over the other 2 methods. + +===================== =========== =============================================================== +Property Name Default Description +===================== =========== =============================================================== +**type** -- The component type name has to be ``remove_header`` +withName -- Name of the header to remove +fromList -- List of headers to remove, separated with the separator specified with from.list.separator +fromListSeparator \s*,\s* Regular expression used to separate multiple header names in the list specified using from.list +matching -- All the headers which names match this regular expression are removed +===================== =========== =============================================================== + + UUID Interceptor ~~~~~~~~~~~~~~~~~~~~~~~~~~~
