Repository: flume Updated Branches: refs/heads/trunk c7b95a746 -> ed288acba
FLUME-3186. Make asyncHbaseClient config parameters available from Flume config This patch adds the ability to set the asyncHbaseClient's config parameters via the Flume configuration. This closes #178 Reviewers: Ferenc Szabo, Denes Arvay (Miklos Csanady via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ed288acb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ed288acb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ed288acb Branch: refs/heads/trunk Commit: ed288acba39bfd611c10b338e36224c1415c2c4c Parents: c7b95a7 Author: Miklos Csanady <[email protected]> Authored: Thu Oct 19 18:21:16 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Thu Oct 19 18:25:15 2017 +0200 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 + .../apache/flume/sink/hbase/AsyncHBaseSink.java | 19 +++- .../hbase/HBaseSinkConfigurationConstants.java | 6 ++ .../hbase/TestAsyncHBaseSinkConfiguration.java | 107 +++++++++++++++++++ 4 files changed, 135 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 6d7085c..73ed7b8 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2540,6 +2540,10 @@ timeout 60000 all events in a transaction. serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer serializer.* -- Properties to be passed to the serializer. +async.* -- Properties to be passed to asyncHbase library. + These properties have precedence over the old ``zookeeperQuorum`` and ``znodeParent`` values. + You can find the list of the available properties at + `the documentation page of AsyncHBase <http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html#properties>`_. =================== ============================================================ ==================================================================================== Note that this sink takes the Zookeeper Quorum and parent znode information in http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index c202a57..881f661 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.hbase.async.AtomicIncrementRequest; +import org.hbase.async.Config; import org.hbase.async.HBaseClient; import org.hbase.async.PutRequest; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; @@ -107,6 +108,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private long batchSize; private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class); private AsyncHbaseEventSerializer serializer; + + @VisibleForTesting + Config asyncClientConfig; private String eventSerializerType; private Context serializerContext; private HBaseClient client; @@ -422,6 +426,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS, HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS); + + Map<String, String> asyncProperties + = context.getSubProperties(HBaseSinkConfigurationConstants.ASYNC_PREFIX); + asyncClientConfig = new Config(); + asyncClientConfig.overrideConfig( + HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, zkQuorum + ); + asyncClientConfig.overrideConfig( + HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY, zkBaseDir + ); + for (String property: asyncProperties.keySet()) { + asyncClientConfig.overrideConfig(property, asyncProperties.get(property)); + } } @VisibleForTesting @@ -450,7 +467,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setNameFormat(this.getName() + " HBase Call Pool").build()); logger.info("Callback pool created"); - client = new HBaseClient(zkQuorum, zkBaseDir, + client = new HBaseClient(asyncClientConfig, new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool)); final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index 5560624..f9ca4bf 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -74,4 +74,10 @@ public class HBaseSinkConfigurationConstants { public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails"; + public static final String ASYNC_PREFIX = "async."; + + public static final String ASYNC_ZK_QUORUM_KEY = "hbase.zookeeper.quorum"; + + public static final String ASYNC_ZK_BASEPATH_KEY = "hbase.zookeeper.znode.parent"; + } http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java new file mode 100644 index 0000000..d4cc360 --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hbase; + +import org.apache.flume.Context; +import org.apache.flume.conf.Configurables; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TestAsyncHBaseSinkConfiguration { + + private static final String tableName = "TestHbaseSink"; + private static final String columnFamily = "TestColumnFamily"; + private static Context ctx = new Context(); + + + @Before + public void setUp() throws Exception { + Map<String, String> ctxMap = new HashMap<>(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctx = new Context(); + ctx.putAll(ctxMap); + } + + + //FLUME-3186 Make asyncHbaseClient configuration parameters available from flume config + @Test + public void testAsyncConfigBackwardCompatibility() throws Exception { + //Old way: zookeeperQuorum + String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value"; + String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value"; + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue); + AsyncHBaseSink sink = new AsyncHBaseSink(); + Configurables.configure(sink, ctx); + Assert.assertEquals( + oldZkQuorumTestValue, + sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY)); + Assert.assertEquals( + oldZkZnodeParentValue, + sink.asyncClientConfig.getString( + HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY)); + } + + @Test + public void testAsyncConfigNewStyleOverwriteOldOne() throws Exception { + //Old way: zookeeperQuorum + String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value"; + String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value"; + ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue); + + String newZkQuorumTestValue = "new_zookeeper_quorum_test_value"; + String newZkZnodeParentValue = "new_zookeeper_znode_parent_test_value"; + ctx.put( + HBaseSinkConfigurationConstants.ASYNC_PREFIX + + HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, + newZkQuorumTestValue); + ctx.put( + HBaseSinkConfigurationConstants.ASYNC_PREFIX + + HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY, + newZkZnodeParentValue); + AsyncHBaseSink sink = new AsyncHBaseSink(); + Configurables.configure(sink, ctx); + Assert.assertEquals( + newZkQuorumTestValue, + sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY)); + Assert.assertEquals( + newZkZnodeParentValue, + sink.asyncClientConfig.getString( + HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY)); + } + + @Test + public void testAsyncConfigAnyKeyCanBePassed() throws Exception { + String valueOfANewProp = "vale of the new property"; + String keyOfANewProp = "some.key.to.be.passed"; + ctx.put(HBaseSinkConfigurationConstants.ASYNC_PREFIX + keyOfANewProp, valueOfANewProp); + AsyncHBaseSink sink = new AsyncHBaseSink(); + Configurables.configure(sink, ctx); + Assert.assertEquals(valueOfANewProp, sink.asyncClientConfig.getString(keyOfANewProp)); + } +} + +
