Repository: flume Updated Branches: refs/heads/trunk 9688cad6f -> 96f6b6284
FLUME-2324: Support writing to multiple HBase clusters using HBaseSink (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/96f6b628 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/96f6b628 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/96f6b628 Branch: refs/heads/trunk Commit: 96f6b6284c6e8a645b122111059ee954e9bad7b5 Parents: 9688cad Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Feb 28 16:25:57 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Feb 28 16:25:57 2014 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 + .../org/apache/flume/sink/hbase/HBaseSink.java | 48 +++++++++++ .../apache/flume/sink/hbase/TestHBaseSink.java | 83 +++++++++++++++++++- 3 files changed, 131 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2cd0996..96bf73e 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1836,6 +1836,8 @@ Property Name Default Desc **type** -- The component type name, needs to be ``hbase`` **table** -- The name of the table in Hbase to write to. **columnFamily** -- The column family in Hbase to write to. +zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml +znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml batchSize 100 Number of events to be written per txn. serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol". serializer.* -- Properties to be passed to the serializer. http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index f5cb229..c4a666c 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -33,6 +33,7 @@ import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; @@ -221,9 +222,56 @@ public class HBaseSink extends AbstractSink implements Configurable { "writes to HBase will have WAL disabled, and any data in the " + "memstore of this region in the Region Server could be lost!"); } + String zkQuorum = context.getString(HBaseSinkConfigurationConstants + .ZK_QUORUM); + Integer port = null; + /** + * HBase allows multiple nodes in the quorum, but all need to use the + * same client port. So get the nodes in host:port format, + * and ignore the ports for all nodes except the first one. If no port is + * specified, use default. + */ + if (zkQuorum != null && !zkQuorum.isEmpty()) { + StringBuilder zkBuilder = new StringBuilder(); + logger.info("Using ZK Quorum: " + zkQuorum); + String[] zkHosts = zkQuorum.split(","); + int length = zkHosts.length; + for(int i = 0; i < length; i++) { + String[] zkHostAndPort = zkHosts[i].split(":"); + zkBuilder.append(zkHostAndPort[0].trim()); + if(i != length-1) { + zkBuilder.append(","); + } else { + zkQuorum = zkBuilder.toString(); + } + if (zkHostAndPort[1] == null) { + throw new FlumeException("Expected client port for the ZK node!"); + } + if (port == null) { + port = Integer.parseInt(zkHostAndPort[1].trim()); + } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) { + throw new FlumeException("All Zookeeper nodes in the quorum must " + + "use the same client port."); + } + } + if(port == null) { + port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; + } + this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); + this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port); + } + String hbaseZnode = context.getString( + HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT); + if(hbaseZnode != null && !hbaseZnode.isEmpty()) { + this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode); + } sinkCounter = new SinkCounter(this.getName()); } + public Configuration getConfig() { + return config; + } + @Override public Status process() throws EventDeliveryException { Status status = Status.READY; http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index 068f543..cb7c6ea 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -77,6 +78,8 @@ public class TestHBaseSink { testUtility.shutdownMiniCluster(); } + + @Test public void testOneEventWithDefaults() throws Exception { //Create a context without setting increment column and payload Column @@ -90,7 +93,7 @@ public class TestHBaseSink { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, tmpctx); + Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); Configurables.configure(channel, new Context()); sink.setChannel(channel); @@ -440,6 +443,82 @@ public class TestHBaseSink { testUtility.deleteTable(tableName.getBytes()); } + @Test + public void testWithoutConfigurationObject() throws Exception{ + ctx.put("batchSize", "2"); + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, + ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) ); + System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM)); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(); + Configurables.configure(sink, ctx); + // Reset context to values usable by other tests. + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null); + ctx.put("batchSize", "100"); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, ctx); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + for(int i = 0; i < 3; i++){ + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); + channel.put(e); + } + tx.commit(); + tx.close(); + Status status = Status.READY; + while(status != Status.BACKOFF){ + status = sink.process(); + } + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 3); + byte[] out; + int found = 0; + for(int i = 0; i < 3; i++){ + for(int j = 0; j < 3; j++){ + if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + found++; + break; + } + } + } + Assert.assertEquals(3, found); + out = results[3]; + Assert.assertArrayEquals(Longs.toByteArray(3), out); + } -} + @Test + public void testZKQuorum() throws Exception{ + String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " + + "zk3.flume.apache.org:3342"; + ctx.put("batchSize", "2"); + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + HBaseSink sink = new HBaseSink(); + Configurables.configure(sink, ctx); + Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," + + "zk3.flume.apache.org", sink.getConfig().get(HConstants + .ZOOKEEPER_QUORUM)); + Assert.assertEquals(String.valueOf(3342), sink.getConfig().get(HConstants + .ZOOKEEPER_CLIENT_PORT)); + } + @Test (expected = FlumeException.class) + public void testZKQuorumIncorrectPorts() throws Exception{ + String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " + + "zk3.flume.apache.org:3342"; + ctx.put("batchSize", "2"); + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + HBaseSink sink = new HBaseSink(); + Configurables.configure(sink, ctx); + Assert.fail(); + } +} \ No newline at end of file
