Repository: flume
Updated Branches:
  refs/heads/trunk 8410ad307 -> 61e3b3881


FLUME-2431. Add simple regex search-and-replace interceptor

(Mike Percy via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/61e3b388
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/61e3b388
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/61e3b388

Branch: refs/heads/trunk
Commit: 61e3b3881b406b4a75897342fa12e84ecb2bcd3c
Parents: 8410ad3
Author: Hari Shreedharan <[email protected]>
Authored: Tue Jul 22 20:00:48 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Jul 22 20:01:26 2014 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/flume/Context.java |   9 ++
 .../flume/interceptor/InterceptorType.java      |   3 +-
 .../SearchAndReplaceInterceptor.java            | 129 +++++++++++++++++++
 .../TestSearchAndReplaceInterceptor.java        |  87 +++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  39 ++++++
 5 files changed, 266 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java 
b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
index 927636c..c0460d2 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
@@ -112,6 +112,15 @@ public class Context {
   public void put(String key, String value) {
     parameters.put(key, value);
   }
+
+  /**
+   * Returns true if this Context contains a mapping for key.
+   * Otherwise, returns false.
+   */
+  public boolean containsKey(String key) {
+    return parameters.containsKey(key);
+  }
+
   /**
    * Gets value mapped to key, returning defaultValue if unmapped.
    * @param key to be found

http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java 
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
index c84cea5..fe341e9 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
@@ -25,7 +25,8 @@ public enum InterceptorType {
   STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class),
   REGEX_FILTER(
       org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class),
-  
REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class);
+  
REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class),
+  
SEARCH_REPLACE(org.apache.flume.interceptor.SearchAndReplaceInterceptor.Builder.class);
 
   private final Class<? extends Interceptor.Builder> builderClass;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
 
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
new file mode 100644
index 0000000..6f5c146
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * Interceptor that allows search-and-replace of event body strings using
+ * regular expressions. This only works with event bodies that are valid
+ * strings. The charset is configurable.
+ * <p>
+ * Usage:
+ * <pre>
+ *   agent.source-1.interceptors.search-replace.searchPattern = ^INFO:
+ *   agent.source-1.interceptors.search-replace.replaceString = Log msg:
+ * </pre>
+ * <p>
+ * Any regular expression search pattern and replacement pattern that can be
+ * used with {@link java.util.regex.Matcher#replaceAll(String)} may be used,
+ * including backtracking and grouping.
+ */
+public class SearchAndReplaceInterceptor implements Interceptor {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(SearchAndReplaceInterceptor.class);
+
+  private final Pattern searchPattern;
+  private final String replaceString;
+  private final Charset charset;
+
+  private SearchAndReplaceInterceptor(Pattern searchPattern,
+    String replaceString,
+    Charset charset) {
+    this.searchPattern = searchPattern;
+    this.replaceString = replaceString;
+    this.charset = charset;
+  }
+
+  @Override
+  public void initialize() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public Event intercept(Event event) {
+    String origBody = new String(event.getBody(), charset);
+    Matcher matcher = searchPattern.matcher(origBody);
+    String newBody = matcher.replaceAll(replaceString);
+    event.setBody(newBody.getBytes(charset));
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    for (Event event : events) {
+      intercept(event);
+    }
+    return events;
+  }
+
+  public static class Builder implements Interceptor.Builder {
+    private static final String SEARCH_PAT_KEY = "searchPattern";
+    private static final String REPLACE_STRING_KEY = "replaceString";
+    private static final String CHARSET_KEY = "charset";
+
+    private Pattern searchRegex;
+    private String replaceString;
+    private Charset charset = Charsets.UTF_8;
+
+    @Override
+    public void configure(Context context) {
+      String searchPattern = context.getString(SEARCH_PAT_KEY);
+      Preconditions.checkArgument(!StringUtils.isEmpty(searchPattern),
+          "Must supply a valid search pattern " + SEARCH_PAT_KEY +
+          " (may not be empty)");
+
+      replaceString = context.getString(REPLACE_STRING_KEY);
+      Preconditions.checkNotNull(replaceString,
+          "Must supply a replacement string " + REPLACE_STRING_KEY +
+          " (empty is ok)");
+
+      searchRegex = Pattern.compile(searchPattern);
+
+      if (context.containsKey(CHARSET_KEY)) {
+        // May throw IllegalArgumentException for unsupported charsets.
+        charset = Charset.forName(context.getString(CHARSET_KEY));
+      }
+    }
+
+    @Override
+    public Interceptor build() {
+      Preconditions.checkNotNull(searchRegex,
+                                 "Regular expression search pattern required");
+      Preconditions.checkNotNull(replaceString,
+                                 "Replacement string required");
+      return new SearchAndReplaceInterceptor(searchRegex, replaceString, 
charset);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
new file mode 100644
index 0000000..b39f912
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSearchAndReplaceInterceptor {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestSearchAndReplaceInterceptor.class);
+
+  private void testSearchReplace(Context context, String input, String output)
+      throws Exception {
+   Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+        InterceptorType.SEARCH_REPLACE.toString());
+    builder.configure(context);
+    Interceptor interceptor = builder.build();
+
+    Event event = EventBuilder.withBody(input, Charsets.UTF_8);
+    event = interceptor.intercept(event);
+    String val = new String(event.getBody(), Charsets.UTF_8);
+    assertEquals(output, val);
+    logger.info(val);
+  }
+
+  @Test
+  public void testRemovePrefix() throws Exception {
+    Context context = new Context();
+    context.put("searchPattern", "^prefix");
+    context.put("replaceString", "");
+    testSearchReplace(context, "prefix non-prefix suffix", " non-prefix 
suffix");
+  }
+
+  @Test
+  public void testSyslogStripPriority() throws Exception {
+    final String input = "<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!";
+    final String output = "Feb  5 17:32:18 10.0.0.99 Use the BFG!";
+    Context context = new Context();
+    context.put("searchPattern", "^<[0-9]+>");
+    context.put("replaceString", "");
+    testSearchReplace(context, input, output);
+  }
+
+  @Test
+  public void testCapturedGroups() throws Exception {
+    final String input = "The quick brown fox jumped over the lazy dog.";
+    final String output = "The hungry dog ate the careless fox.";
+    Context context = new Context();
+    context.put("searchPattern", "The quick brown ([a-z]+) jumped over the 
lazy ([a-z]+).");
+    context.put("replaceString", "The hungry $2 ate the careless $1.");
+    testSearchReplace(context, input, output);
+  }
+
+  @Test
+  public void testRepeatedRemoval() throws Exception {
+    final String input = "Email addresses: [email protected] and [email protected]";
+    final String output = "Email addresses: REDACTED and REDACTED";
+    Context context = new Context();
+    context.put("searchPattern", "[A-Za-z0-9_.]+@[A-Za-z0-9_-]+\\.com");
+    context.put("replaceString", "REDACTED");
+    testSearchReplace(context, input, output);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/61e3b388/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 1e98725..daf6e72 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2841,6 +2841,45 @@ Sample flume.conf file:
   a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = 
/etc/flume-ng/conf/morphline.conf
   a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
 
+Search and Replace Interceptor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor provides simple string-based search-and-replace functionality
+based on Java regular expressions. Backtracking / group capture is also 
available.
+This interceptor uses the same rules as in the Java Matcher.replaceAll() 
method.
+
+================  =======  
========================================================================
+Property Name     Default  Description
+================  =======  
========================================================================
+**type**          --       The component type name has to be ``search_replace``
+searchPattern     --       The pattern to search for and replace.
+replaceString     --       The replacement string.
+charset           UTF-8    The charset of the event body. Assumed by default 
to be UTF-8.
+================  =======  
========================================================================
+
+Example configuration:
+
+.. code-block:: properties
+
+  a1.sources.avroSrc.interceptors = search-replace
+  a1.sources.avroSrc.interceptors.search-replace.type = search_replace
+
+  # Remove leading alphanumeric characters in an event body.
+  a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
+  a1.sources.avroSrc.interceptors.search-replace.replaceString =
+
+Another example:
+
+.. code-block:: properties
+
+  a1.sources.avroSrc.interceptors = search-replace
+  a1.sources.avroSrc.interceptors.search-replace.type = search_replace
+
+  # Use grouping operators to reorder and munge words on a line.
+  a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick 
brown ([a-z]+) jumped over the lazy ([a-z]+)
+  a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 
ate the careless $1
+
+
 Regex Filtering Interceptor
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

Reply via email to