Updated Branches: refs/heads/trunk eefefa941 -> d63c378b5
FLUME-2063: Add Configurable charset to RegexHbaseEventSerializer (Roman Shaposhnik via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d63c378b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d63c378b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d63c378b Branch: refs/heads/trunk Commit: d63c378b551bba338677d50a36d254dd8c62fe4a Parents: eefefa9 Author: Brock Noland <[email protected]> Authored: Sun Jun 2 17:02:19 2013 -0700 Committer: Brock Noland <[email protected]> Committed: Sun Jun 2 17:02:19 2013 -0700 ---------------------------------------------------------------------- .../sink/hbase/RegexHbaseEventSerializer.java | 18 ++++++++++---- .../sink/hbase/TestRegexHbaseEventSerializer.java | 18 ++++++++++----- 2 files changed, 25 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d63c378b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java index 0df559d..27974d9 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java @@ -18,6 +18,7 @@ */ package org.apache.flume.sink.hbase; +import java.nio.charset.Charset; import java.util.Calendar; import java.util.List; import java.util.Map; @@ -67,6 +68,10 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { /** Whether to deposit event headers into corresponding column qualifiers */ public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders"; public static final boolean DEPOSIT_HEADERS_DEFAULT = false; + + /** What charset to use when serializing into HBase's byte arrays */ + public static final String CHARSET_CONFIG = "charset"; + public static final String CHARSET_DEFAULT = "UTF-8"; /* This is a nonce used in HBase row-keys, such that the same row-key * never gets written more than once from within this JVM. */ @@ -80,6 +85,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { private boolean regexIgnoreCase; private boolean depositHeaders; private Pattern inputPattern; + private Charset charset; @Override public void configure(Context context) { @@ -90,11 +96,13 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { DEPOSIT_HEADERS_DEFAULT); inputPattern = Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); + charset = Charset.forName(context.getString(CHARSET_CONFIG, + CHARSET_DEFAULT)); String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT); String[] columnNames = colNameStr.split(","); for (String s: columnNames) { - colNames.add(s.getBytes(Charsets.UTF_8)); + colNames.add(s.getBytes(charset)); } } @@ -131,7 +139,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { * data loss. */ String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement()); - return rowKey.getBytes(Charsets.UTF_8); + return rowKey.getBytes(charset); } protected byte[] getRowKey() { @@ -142,7 +150,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { public List<Row> getActions() throws FlumeException { List<Row> actions = Lists.newArrayList(); byte[] rowKey; - Matcher m = inputPattern.matcher(new String(payload)); + Matcher m = inputPattern.matcher(new String(payload, charset)); if (!m.matches()) { return Lists.newArrayList(); } @@ -156,11 +164,11 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { Put put = new Put(rowKey); for (int i = 0; i < colNames.size(); i++) { - put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8)); + put.add(cf, colNames.get(i), m.group(i + 1).getBytes(charset)); } if (depositHeaders) { for (Map.Entry<String, String> entry : headers.entrySet()) { - put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), entry.getValue().getBytes(Charsets.UTF_8)); + put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset)); } } actions.add(put); http://git-wip-us.apache.org/repos/asf/flume/blob/d63c378b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java index 6cec36f..191dc54 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java @@ -18,11 +18,13 @@ */ package org.apache.flume.sink.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.nio.charset.Charset; import java.util.Calendar; import java.util.List; import java.util.Map; @@ -162,16 +164,19 @@ public class TestRegexHbaseEventSerializer { @Test /** Test depositing of the header information. */ public void testDepositHeaders() throws Exception { + Charset charset = Charset.forName("KOI8-R"); RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); Context context = new Context(); context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG, "true"); + context.put(RegexHbaseEventSerializer.CHARSET_CONFIG, + charset.toString()); s.configure(context); String body = "body"; Map<String, String> headers = Maps.newHashMap(); headers.put("header1", "value1"); - headers.put("header2", "value2"); + headers.put("заголовок2", "знаÑение2"); Event e = EventBuilder.withBody(Bytes.toBytes(body), headers); s.initialize(e, "CF".getBytes()); @@ -184,14 +189,15 @@ public class TestRegexHbaseEventSerializer { List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf); assertTrue(kvPairs.size() == 3); - Map<String, String> resultMap = Maps.newHashMap(); + Map<String, byte[]> resultMap = Maps.newHashMap(); for (KeyValue kv : kvPairs) { - resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); + resultMap.put(new String(kv.getQualifier(), charset), kv.getValue()); } - assertEquals(body, resultMap.get("payload")); - assertEquals("value1", resultMap.get("header1")); - assertEquals("value2", resultMap.get("header2")); + assertEquals(body, new String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset)); + assertEquals("value1", new String(resultMap.get("header1"), charset)); + assertArrayEquals("знаÑение2".getBytes(charset), resultMap.get("заголовок2")); + assertEquals("знаÑение2".length(), resultMap.get("заголовок2").length); List<Increment> increments = s.getIncrements(); assertEquals(0, increments.size());
