Updated Branches:
  refs/heads/trunk 1e66428c6 -> eefefa941

FLUME-2062. Make it possible for HBase sink to deposit event headers into 
corresponding column qualifiers

(Roman Shaposhnik via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/eefefa94
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/eefefa94
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/eefefa94

Branch: refs/heads/trunk
Commit: eefefa941a60c0982f0957804be0cafb4d83e46e
Parents: 1e66428
Author: Hari Shreedharan <[email protected]>
Authored: Fri May 31 12:19:21 2013 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Fri May 31 12:19:21 2013 -0700

----------------------------------------------------------------------
 .../sink/hbase/RegexHbaseEventSerializer.java      |   25 +++++++--
 .../sink/hbase/TestRegexHbaseEventSerializer.java  |   40 ++++++++++++++-
 2 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/eefefa94/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 965d6b0..0df559d 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flume.sink.hbase;
 
 import java.util.Calendar;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -63,6 +64,9 @@ public class RegexHbaseEventSerializer implements 
HbaseEventSerializer {
   public static final String COL_NAME_CONFIG = "colNames";
   public static final String COLUMN_NAME_DEFAULT = "payload";
 
+  /** Whether to deposit event headers into corresponding column qualifiers */
+  public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
+  public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
   
   /* This is a nonce used in HBase row-keys, such that the same row-key
    * never gets written more than once from within this JVM. */
@@ -72,7 +76,9 @@ public class RegexHbaseEventSerializer implements 
HbaseEventSerializer {
   protected byte[] cf;
   private byte[] payload;
   private List<byte[]> colNames = Lists.newArrayList();
+  private Map<String, String> headers;
   private boolean regexIgnoreCase;
+  private boolean depositHeaders;
   private Pattern inputPattern;
   
   @Override
@@ -80,6 +86,8 @@ public class RegexHbaseEventSerializer implements 
HbaseEventSerializer {
     String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);
     regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, 
         INGORE_CASE_DEFAULT);
+    depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG,
+        DEPOSIT_HEADERS_DEFAULT);
     inputPattern = Pattern.compile(regex, Pattern.DOTALL
         + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
     
@@ -96,6 +104,7 @@ public class RegexHbaseEventSerializer implements 
HbaseEventSerializer {
 
   @Override
   public void initialize(Event event, byte[] columnFamily) {
+    this.headers = event.getHeaders();
     this.payload = event.getBody();
     this.cf = columnFamily;
   }
@@ -142,19 +151,23 @@ public class RegexHbaseEventSerializer implements 
HbaseEventSerializer {
       return Lists.newArrayList();
     }
     
-       try {
+    try {
       rowKey = getRowKey();
       Put put = new Put(rowKey);
       
       for (int i = 0; i < colNames.size(); i++) {
         put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
       }
+      if (depositHeaders) {
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+          put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), 
entry.getValue().getBytes(Charsets.UTF_8));
+        }
+      }
       actions.add(put);
-       }
-       catch (Exception e) {
-         throw new FlumeException("Could not get row key!", e);
-       }
-       return actions;
+    } catch (Exception e) {
+      throw new FlumeException("Could not get row key!", e);
+    }
+    return actions;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/eefefa94/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
index f63eed0..6cec36f 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
@@ -158,4 +158,42 @@ public class TestRegexHbaseEventSerializer {
     assertEquals("100-" + randomString + "-2", rk3);
     
   }
-}
\ No newline at end of file
+
+   @Test
+  /** Test depositing of the header information. */
+  public void testDepositHeaders() throws Exception {
+    RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
+    Context context = new Context();
+    context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG,
+        "true");
+    s.configure(context);
+
+    String body = "body";
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    Event e = EventBuilder.withBody(Bytes.toBytes(body), headers);
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+    assertEquals(1, s.getActions().size());
+    assertTrue(actions.get(0) instanceof Put);
+
+    Put put = (Put) actions.get(0);
+    assertTrue(put.getFamilyMap().containsKey(s.cf));
+    List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
+    assertTrue(kvPairs.size() == 3);
+
+    Map<String, String> resultMap = Maps.newHashMap();
+    for (KeyValue kv : kvPairs) {
+      resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
+    }
+
+    assertEquals(body, resultMap.get("payload"));
+    assertEquals("value1", resultMap.get("header1"));
+    assertEquals("value2", resultMap.get("header2"));
+
+    List<Increment> increments = s.getIncrements();
+    assertEquals(0, increments.size());
+  }
+}

Reply via email to