Updated Branches: refs/heads/trunk a13e9e6a8 -> 669e5d327
FLUME-1757. Improve configuration of hbase serializers. (Sravya Tirukkovalur via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/669e5d32 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/669e5d32 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/669e5d32 Branch: refs/heads/trunk Commit: 669e5d327720021391245949d6a947b4e963b728 Parents: a13e9e6 Author: Hari Shreedharan <[email protected]> Authored: Mon May 20 20:12:38 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon May 20 20:12:38 2013 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +- .../hbase/SimpleAsyncHbaseEventSerializer.java | 4 +- .../sink/hbase/SimpleHbaseEventSerializer.java | 4 +- .../flume/sink/hbase/TestAsyncHBaseSink.java | 42 ++++++++++++++- .../org/apache/flume/sink/hbase/TestHBaseSink.java | 41 +++++++++++++- 5 files changed, 84 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2ee41be..1b4d216 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1697,7 +1697,7 @@ Property Name Default Desc **table** -- The name of the table in Hbase to write to. **columnFamily** -- The column family in Hbase to write to. batchSize 100 Number of events to be written per txn. -serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer +serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol". serializer.* -- Properties to be passed to the serializer. kerberosPrincipal -- Kerberos user principal for accessing secure HBase kerberosKeytab -- Kerberos keytab for accessing secure HBase http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java index dd19616..96095d1 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java @@ -115,8 +115,8 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize @Override public void configure(Context context) { - String pCol = context.getString("payloadColumn"); - String iCol = context.getString("incrementColumn"); + String pCol = context.getString("payloadColumn", "pCol"); + String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); if(pCol != null && !pCol.isEmpty()) { http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java index 52bc84d..758252b 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java @@ -70,8 +70,8 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer { context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); String suffix = context.getString("suffix", "uuid"); - String payloadColumn = context.getString("payloadColumn"); - String incColumn = context.getString("incrementColumn"); + String payloadColumn = context.getString("payloadColumn","pCol"); + String incColumn = context.getString("incrementColumn","iCol"); if(payloadColumn != null && !payloadColumn.isEmpty()) { if(suffix.equals("timestamp")){ keyType = KeyType.TS; http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index 03c3e4c..7ddfdae 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -69,8 +69,8 @@ public class TestAsyncHBaseSink { private static String tableName = "TestHbaseSink"; private static String columnFamily = "TestColumnFamily"; - private static String inColumn = "Increment"; - private static String plCol = "pc"; + private static String inColumn = "iCol"; + private static String plCol = "pCol"; private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; private boolean deleteTable = true; @@ -164,6 +164,44 @@ public class TestAsyncHBaseSink { } @Test + public void testOneEventWithDefaults() throws Exception { + Map<String,String> ctxMap = new HashMap<String,String>(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctxMap.put("serializer", + "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"); + ctxMap.put("keep-alive", "0"); + ctxMap.put("timeout", "10000"); + Context tmpctx = new Context(); + tmpctx.putAll(ctxMap); + + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, tmpctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, tmpctx); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = EventBuilder.withBody( + Bytes.toBytes(valBase)); + channel.put(e); + tx.commit(); + tx.close(); + Assert.assertFalse(sink.isConfNull()); + sink.process(); + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 1); + byte[] out = results[0]; + Assert.assertArrayEquals(e.getBody(), out); + out = results[1]; + Assert.assertArrayEquals(Longs.toByteArray(1), out); + } + + @Test public void testOneEvent() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index ad94fc9..ab4128e 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -55,8 +55,8 @@ public class TestHBaseSink { private static HBaseTestingUtility testUtility = new HBaseTestingUtility(); private static String tableName = "TestHbaseSink"; private static String columnFamily = "TestColumnFamily"; - private static String inColumn = "Increment"; - private static String plCol = "pc"; + private static String inColumn = "iCol"; + private static String plCol = "pCol"; private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; @@ -80,6 +80,43 @@ public class TestHBaseSink { } @Test + public void testOneEventWithDefaults() throws Exception { + //Create a context without setting increment column and payload Column + Map<String,String> ctxMap = new HashMap<String,String>(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctxMap.put("serializer", + "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"); + Context tmpctx = new Context(); + tmpctx.putAll(ctxMap); + + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, tmpctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = EventBuilder.withBody( + Bytes.toBytes(valBase)); + channel.put(e); + tx.commit(); + tx.close(); + + sink.process(); + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 1); + byte[] out = results[0]; + Assert.assertArrayEquals(e.getBody(), out); + out = results[1]; + Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); + } + + @Test public void testOneEvent() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
