Updated Branches: refs/heads/flume-1.4 cd3cf7ef9 -> 6d63f9778
FLUME-1821: Support configuration of hbase instances to be used in AsyncHBaseSink from flume config (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6d63f977 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6d63f977 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6d63f977 Branch: refs/heads/flume-1.4 Commit: 6d63f9778289ff414c0fb79fd661b2019b9b9310 Parents: cd3cf7e Author: Brock Noland <[email protected]> Authored: Thu Jan 10 10:24:10 2013 -0800 Committer: Brock Noland <[email protected]> Committed: Thu Jan 10 10:24:28 2013 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 11 +++- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 30 ++++++- .../hbase/HBaseSinkConfigurationConstants.java | 10 +++ .../flume/sink/hbase/TestAsyncHBaseSink.java | 63 ++++++++++++++- 4 files changed, 109 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6d63f977/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 63b8f9b..58a115e 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1526,7 +1526,6 @@ HBase puts and/or increments. These puts and increments are then written to HBase. This sink provides the same consistency guarantees as HBase, which is currently row-wise atomicity. In the event of Hbase failing to write certain events, the sink will replay all events in that transaction. -This sink is still experimental. The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink. Required properties are in **bold**. @@ -1536,6 +1535,8 @@ Property Name Default **channel** -- **type** -- The component type name, needs to be ``org.apache.flume.sink.hbase.AsyncHBaseSink`` **table** -- The name of the table in Hbase to write to. +zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml +znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml **columnFamily** -- The column family in Hbase to write to. batchSize 100 Number of events to be written per txn. timeout -- The length of time (in milliseconds) the sink waits for acks from hbase for @@ -1544,6 +1545,14 @@ serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer serializer.* -- Properties to be passed to the serializer. ================ ============================================================ ==================================================================================== +Note that this sink takes the Zookeeper Quorum and parent znode information in +the configuration. Zookeeper Quorum and parent node configuration may be +specified in the flume configuration file, alternatively these configuration +values are taken from the first hbase-site.xml file in the classpath. + +If these are not provided in the configuration, then the sink +will read this information from the first hbase-site.xml file in the classpath. + Example for agent named a1: .. code-block:: properties http://git-wip-us.apache.org/repos/asf/flume/blob/6d63f977/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 1598f26..6b34873 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -105,14 +106,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private volatile boolean open = false; private SinkCounter sinkCounter; private long timeout; + private String zkQuorum; + private String zkBaseDir; public AsyncHBaseSink(){ - conf = HBaseConfiguration.create(); + this(null); } public AsyncHBaseSink(Configuration conf) { this.conf = conf; } + @Override public Status process() throws EventDeliveryException { /* @@ -284,6 +288,27 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "Sink will not timeout."); timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT; } + + zkQuorum = context.getString( + HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim(); + if(!zkQuorum.isEmpty()) { + zkBaseDir = context.getString( + HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, + HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT); + } else { + if (conf == null) { //In tests, we pass the conf in. + conf = HBaseConfiguration.create(); + } + zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); + zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); + } + Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(), + "The Zookeeper quorum cannot be null and should be specified."); + } + + @VisibleForTesting + boolean isConfNull() { + return conf == null; } @Override public void start(){ @@ -291,8 +316,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "before calling start on an old instance."); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); - String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); - String zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); if(zkBaseDir != null){ client = new HBaseClient(zkQuorum, zkBaseDir); } else { @@ -344,6 +367,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { sinkCounter.incrementConnectionClosedCount(); sinkCounter.stop(); client = null; + conf = null; open = false; super.stop(); } http://git-wip-us.apache.org/repos/asf/flume/blob/6d63f977/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index 463c9c3..fad026c 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -17,6 +17,9 @@ * under the License. */ package org.apache.flume.sink.hbase; + +import org.apache.hadoop.hbase.HConstants; + /** * Constants used for configuration of HBaseSink and AsyncHBaseSink * @@ -52,4 +55,11 @@ public class HBaseSinkConfigurationConstants { public static final String CONFIG_PRINCIPAL = "kerberosPrincipal"; + public static final String ZK_QUORUM = "zookeeperQuorum"; + + public static final String ZK_ZNODE_PARENT = "znodeParent"; + + public static final String DEFAULT_ZK_ZNODE_PARENT = + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; + } http://git-wip-us.apache.org/repos/asf/flume/blob/6d63f977/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index c835172..1f61406 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -167,7 +167,7 @@ public class TestAsyncHBaseSink { channel.put(e); tx.commit(); tx.close(); - + Assert.assertFalse(sink.isConfNull()); sink.process(); sink.stop(); HTable table = new HTable(testUtility.getConfiguration(), tableName); @@ -196,6 +196,7 @@ public class TestAsyncHBaseSink { } tx.commit(); tx.close(); + Assert.assertFalse(sink.isConfNull()); sink.process(); sink.stop(); HTable table = new HTable(testUtility.getConfiguration(), tableName); @@ -242,6 +243,64 @@ public class TestAsyncHBaseSink { count++; status = sink.process(); } + Assert.assertFalse(sink.isConfNull()); + sink.stop(); + Assert.assertEquals(2, count); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 3); + byte[] out; + int found = 0; + for(int i = 0; i < 3; i++){ + for(int j = 0; j < 3; j++){ + if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + found++; + break; + } + } + } + Assert.assertEquals(3, found); + out = results[3]; + Assert.assertArrayEquals(Longs.toByteArray(3), out); + testUtility.deleteTable(tableName.getBytes()); + } + + @Test + public void testWithoutConfigurationObject() throws Exception{ + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + ctx.put("batchSize", "2"); + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + AsyncHBaseSink sink = new AsyncHBaseSink(); + Configurables.configure(sink, ctx); + // Reset context to values usable by other tests. + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null); + ctx.put("batchSize", "100"); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + for(int i = 0; i < 3; i++){ + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); + channel.put(e); + } + tx.commit(); + tx.close(); + int count = 0; + Status status = Status.READY; + while(status != Status.BACKOFF){ + count++; + status = sink.process(); + } + /* + * Make sure that the configuration was picked up from the context itself + * and not from a configuration object which was created by the sink. + */ + Assert.assertTrue(sink.isConfNull()); sink.stop(); Assert.assertEquals(2, count); HTable table = new HTable(testUtility.getConfiguration(), tableName); @@ -282,6 +341,7 @@ public class TestAsyncHBaseSink { tx.commit(); tx.close(); sink.process(); + Assert.assertFalse(sink.isConfNull()); HTable table = new HTable(testUtility.getConfiguration(), tableName); byte[][] results = getResults(table, 2); byte[] out; @@ -330,6 +390,7 @@ public class TestAsyncHBaseSink { tx.commit(); tx.close(); sink.process(); + Assert.assertFalse(sink.isConfNull()); HTable table = new HTable(testUtility.getConfiguration(), tableName); byte[][] results = getResults(table, 2); byte[] out;
