Repository: flume Updated Branches: refs/heads/trunk 8410ad307 -> 61e3b3881
FLUME-2431. Add simple regex search-and-replace interceptor (Mike Percy via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/61e3b388 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/61e3b388 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/61e3b388 Branch: refs/heads/trunk Commit: 61e3b3881b406b4a75897342fa12e84ecb2bcd3c Parents: 8410ad3 Author: Hari Shreedharan <[email protected]> Authored: Tue Jul 22 20:00:48 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Jul 22 20:01:26 2014 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/flume/Context.java | 9 ++ .../flume/interceptor/InterceptorType.java | 3 +- .../SearchAndReplaceInterceptor.java | 129 +++++++++++++++++++ .../TestSearchAndReplaceInterceptor.java | 87 +++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 39 ++++++ 5 files changed, 266 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-configuration/src/main/java/org/apache/flume/Context.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java index 927636c..c0460d2 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java @@ -112,6 +112,15 @@ public class Context { public void put(String key, String value) { parameters.put(key, value); } + + /** + * Returns true if this Context contains a mapping for key. + * Otherwise, returns false. + */ + public boolean containsKey(String key) { + return parameters.containsKey(key); + } + /** * Gets value mapped to key, returning defaultValue if unmapped. * @param key to be found http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java index c84cea5..fe341e9 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java @@ -25,7 +25,8 @@ public enum InterceptorType { STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class), REGEX_FILTER( org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class), - REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class); + REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class), + SEARCH_REPLACE(org.apache.flume.interceptor.SearchAndReplaceInterceptor.Builder.class); private final Class<? extends Interceptor.Builder> builderClass; http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java new file mode 100644 index 0000000..6f5c146 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * <p> + * Interceptor that allows search-and-replace of event body strings using + * regular expressions. This only works with event bodies that are valid + * strings. The charset is configurable. + * <p> + * Usage: + * <pre> + * agent.source-1.interceptors.search-replace.searchPattern = ^INFO: + * agent.source-1.interceptors.search-replace.replaceString = Log msg: + * </pre> + * <p> + * Any regular expression search pattern and replacement pattern that can be + * used with {@link java.util.regex.Matcher#replaceAll(String)} may be used, + * including backtracking and grouping. + */ +public class SearchAndReplaceInterceptor implements Interceptor { + + private static final Logger logger = LoggerFactory + .getLogger(SearchAndReplaceInterceptor.class); + + private final Pattern searchPattern; + private final String replaceString; + private final Charset charset; + + private SearchAndReplaceInterceptor(Pattern searchPattern, + String replaceString, + Charset charset) { + this.searchPattern = searchPattern; + this.replaceString = replaceString; + this.charset = charset; + } + + @Override + public void initialize() { + } + + @Override + public void close() { + } + + @Override + public Event intercept(Event event) { + String origBody = new String(event.getBody(), charset); + Matcher matcher = searchPattern.matcher(origBody); + String newBody = matcher.replaceAll(replaceString); + event.setBody(newBody.getBytes(charset)); + return event; + } + + @Override + public List<Event> intercept(List<Event> events) { + for (Event event : events) { + intercept(event); + } + return events; + } + + public static class Builder implements Interceptor.Builder { + private static final String SEARCH_PAT_KEY = "searchPattern"; + private static final String REPLACE_STRING_KEY = "replaceString"; + private static final String CHARSET_KEY = "charset"; + + private Pattern searchRegex; + private String replaceString; + private Charset charset = Charsets.UTF_8; + + @Override + public void configure(Context context) { + String searchPattern = context.getString(SEARCH_PAT_KEY); + Preconditions.checkArgument(!StringUtils.isEmpty(searchPattern), + "Must supply a valid search pattern " + SEARCH_PAT_KEY + + " (may not be empty)"); + + replaceString = context.getString(REPLACE_STRING_KEY); + Preconditions.checkNotNull(replaceString, + "Must supply a replacement string " + REPLACE_STRING_KEY + + " (empty is ok)"); + + searchRegex = Pattern.compile(searchPattern); + + if (context.containsKey(CHARSET_KEY)) { + // May throw IllegalArgumentException for unsupported charsets. + charset = Charset.forName(context.getString(CHARSET_KEY)); + } + } + + @Override + public Interceptor build() { + Preconditions.checkNotNull(searchRegex, + "Regular expression search pattern required"); + Preconditions.checkNotNull(replaceString, + "Replacement string required"); + return new SearchAndReplaceInterceptor(searchRegex, replaceString, charset); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java new file mode 100644 index 0000000..b39f912 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.interceptor; + +import com.google.common.base.Charsets; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +public class TestSearchAndReplaceInterceptor { + + private static final Logger logger = + LoggerFactory.getLogger(TestSearchAndReplaceInterceptor.class); + + private void testSearchReplace(Context context, String input, String output) + throws Exception { + Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( + InterceptorType.SEARCH_REPLACE.toString()); + builder.configure(context); + Interceptor interceptor = builder.build(); + + Event event = EventBuilder.withBody(input, Charsets.UTF_8); + event = interceptor.intercept(event); + String val = new String(event.getBody(), Charsets.UTF_8); + assertEquals(output, val); + logger.info(val); + } + + @Test + public void testRemovePrefix() throws Exception { + Context context = new Context(); + context.put("searchPattern", "^prefix"); + context.put("replaceString", ""); + testSearchReplace(context, "prefix non-prefix suffix", " non-prefix suffix"); + } + + @Test + public void testSyslogStripPriority() throws Exception { + final String input = "<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!"; + final String output = "Feb 5 17:32:18 10.0.0.99 Use the BFG!"; + Context context = new Context(); + context.put("searchPattern", "^<[0-9]+>"); + context.put("replaceString", ""); + testSearchReplace(context, input, output); + } + + @Test + public void testCapturedGroups() throws Exception { + final String input = "The quick brown fox jumped over the lazy dog."; + final String output = "The hungry dog ate the careless fox."; + Context context = new Context(); + context.put("searchPattern", "The quick brown ([a-z]+) jumped over the lazy ([a-z]+)."); + context.put("replaceString", "The hungry $2 ate the careless $1."); + testSearchReplace(context, input, output); + } + + @Test + public void testRepeatedRemoval() throws Exception { + final String input = "Email addresses: [email protected] and [email protected]"; + final String output = "Email addresses: REDACTED and REDACTED"; + Context context = new Context(); + context.put("searchPattern", "[A-Za-z0-9_.]+@[A-Za-z0-9_-]+\\.com"); + context.put("replaceString", "REDACTED"); + testSearchReplace(context, input, output); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 1e98725..daf6e72 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2841,6 +2841,45 @@ Sample flume.conf file: a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1 +Search and Replace Interceptor +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This interceptor provides simple string-based search-and-replace functionality +based on Java regular expressions. Backtracking / group capture is also available. +This interceptor uses the same rules as in the Java Matcher.replaceAll() method. + +================ ======= ======================================================================== +Property Name Default Description +================ ======= ======================================================================== +**type** -- The component type name has to be ``search_replace`` +searchPattern -- The pattern to search for and replace. +replaceString -- The replacement string. +charset UTF-8 The charset of the event body. Assumed by default to be UTF-8. +================ ======= ======================================================================== + +Example configuration: + +.. code-block:: properties + + a1.sources.avroSrc.interceptors = search-replace + a1.sources.avroSrc.interceptors.search-replace.type = search_replace + + # Remove leading alphanumeric characters in an event body. + a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ + a1.sources.avroSrc.interceptors.search-replace.replaceString = + +Another example: + +.. code-block:: properties + + a1.sources.avroSrc.interceptors = search-replace + a1.sources.avroSrc.interceptors.search-replace.type = search_replace + + # Use grouping operators to reorder and munge words on a line. + a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) + a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1 + + Regex Filtering Interceptor ~~~~~~~~~~~~~~~~~~~~~~~~~~~
