Updated Branches: refs/heads/flume-1.4 7f66495d9 -> f4239fca1
FLUME-2062. Make it possible for HBase sink to deposit event headers into corresponding column qualifiers (Roman Shaposhnik via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f4239fca Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f4239fca Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f4239fca Branch: refs/heads/flume-1.4 Commit: f4239fca168a63bfb58e777397faeff9ef2209f0 Parents: 7f66495 Author: Hari Shreedharan <[email protected]> Authored: Fri May 31 12:19:21 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri May 31 12:38:33 2013 -0700 ---------------------------------------------------------------------- .../sink/hbase/RegexHbaseEventSerializer.java | 25 +++++++-- .../sink/hbase/TestRegexHbaseEventSerializer.java | 40 ++++++++++++++- 2 files changed, 58 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f4239fca/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java index 965d6b0..0df559d 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java @@ -20,6 +20,7 @@ package org.apache.flume.sink.hbase; import java.util.Calendar; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -63,6 +64,9 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { public static final String COL_NAME_CONFIG = "colNames"; public static final String COLUMN_NAME_DEFAULT = "payload"; + /** Whether to deposit event headers into corresponding column qualifiers */ + public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders"; + public static final boolean DEPOSIT_HEADERS_DEFAULT = false; /* This is a nonce used in HBase row-keys, such that the same row-key * never gets written more than once from within this JVM. */ @@ -72,7 +76,9 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { protected byte[] cf; private byte[] payload; private List<byte[]> colNames = Lists.newArrayList(); + private Map<String, String> headers; private boolean regexIgnoreCase; + private boolean depositHeaders; private Pattern inputPattern; @Override @@ -80,6 +86,8 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT); regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, INGORE_CASE_DEFAULT); + depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG, + DEPOSIT_HEADERS_DEFAULT); inputPattern = Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); @@ -96,6 +104,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { @Override public void initialize(Event event, byte[] columnFamily) { + this.headers = event.getHeaders(); this.payload = event.getBody(); this.cf = columnFamily; } @@ -142,19 +151,23 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { return Lists.newArrayList(); } - try { + try { rowKey = getRowKey(); Put put = new Put(rowKey); for (int i = 0; i < colNames.size(); i++) { put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8)); } + if (depositHeaders) { + for (Map.Entry<String, String> entry : headers.entrySet()) { + put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), entry.getValue().getBytes(Charsets.UTF_8)); + } + } actions.add(put); - } - catch (Exception e) { - throw new FlumeException("Could not get row key!", e); - } - return actions; + } catch (Exception e) { + throw new FlumeException("Could not get row key!", e); + } + return actions; } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/f4239fca/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java index f63eed0..6cec36f 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java @@ -158,4 +158,42 @@ public class TestRegexHbaseEventSerializer { assertEquals("100-" + randomString + "-2", rk3); } -} \ No newline at end of file + + @Test + /** Test depositing of the header information. */ + public void testDepositHeaders() throws Exception { + RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); + Context context = new Context(); + context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG, + "true"); + s.configure(context); + + String body = "body"; + Map<String, String> headers = Maps.newHashMap(); + headers.put("header1", "value1"); + headers.put("header2", "value2"); + + Event e = EventBuilder.withBody(Bytes.toBytes(body), headers); + s.initialize(e, "CF".getBytes()); + List<Row> actions = s.getActions(); + assertEquals(1, s.getActions().size()); + assertTrue(actions.get(0) instanceof Put); + + Put put = (Put) actions.get(0); + assertTrue(put.getFamilyMap().containsKey(s.cf)); + List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf); + assertTrue(kvPairs.size() == 3); + + Map<String, String> resultMap = Maps.newHashMap(); + for (KeyValue kv : kvPairs) { + resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); + } + + assertEquals(body, resultMap.get("payload")); + assertEquals("value1", resultMap.get("header1")); + assertEquals("value2", resultMap.get("header2")); + + List<Increment> increments = s.getIncrements(); + assertEquals(0, increments.size()); + } +}
