Updated Branches: refs/heads/trunk 41f1e8afb -> 5d49eeb73
FLUME-2134. AsyncHbase Sink should use ZKConfig.getZKQuorumServersString plus test fixes on Windows (Roshan Naik via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5d49eeb7 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5d49eeb7 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5d49eeb7 Branch: refs/heads/trunk Commit: 5d49eeb734d1c55aaf48219699706851d9e820b5 Parents: 41f1e8a Author: Hari Shreedharan <[email protected]> Authored: Mon Aug 5 17:25:53 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Aug 5 17:25:53 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 3 +- .../flume/sink/hbase/TestAsyncHBaseSink.java | 83 ++------------------ 2 files changed, 8 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5d49eeb7/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 7020fcd..5e297b1 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -37,6 +37,7 @@ import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.HBaseClient; import org.hbase.async.PutRequest; @@ -318,7 +319,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { if (conf == null) { //In tests, we pass the conf in. conf = HBaseConfiguration.create(); } - zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); + zkQuorum = ZKConfig.getZKQuorumServersString(conf); zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); } http://git-wip-us.apache.org/repos/asf/flume/blob/5d49eeb7/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index 7ddfdae..a0c04eb 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -19,14 +19,11 @@ package org.apache.flume.sink.hbase; - -import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.commons.io.FileUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -37,35 +34,26 @@ import org.apache.flume.Sink.Status; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import com.google.common.io.Files; import com.google.common.primitives.Longs; -import java.lang.reflect.Method; import org.junit.After; public class TestAsyncHBaseSink { - private static HBaseTestingUtility testUtility; - private static MiniZooKeeperCluster zookeeperCluster; - private static MiniHBaseCluster hbaseCluster; - private static String workDir = Files.createTempDir().getAbsolutePath(); + private static HBaseTestingUtility testUtility = new HBaseTestingUtility(); private static String tableName = "TestHbaseSink"; private static String columnFamily = "TestColumnFamily"; @@ -78,65 +66,8 @@ public class TestAsyncHBaseSink { @BeforeClass public static void setUp() throws Exception { + testUtility.startMiniCluster(); - /* - * Borrowed from HCatalog ManyMiniCluster.java - * https://svn.apache.org/repos/asf/incubator/hcatalog/trunk/ - * storage-handlers/hbase/src/test/org/apache/hcatalog/ - * hbase/ManyMiniCluster.java - * - */ - String hbaseDir = new File(workDir,"hbase").getAbsolutePath(); - String hbaseRoot = "file://" + hbaseDir; - Configuration hbaseConf = HBaseConfiguration.create(); - - hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); - hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181); - hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); - hbaseConf.setInt("hbase.master.info.port", -1); - hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns",500); - String zookeeperDir = new File(workDir,"zk").getAbsolutePath(); - int zookeeperPort = 2181; - zookeeperCluster = new MiniZooKeeperCluster(); - Method m; - Class<?> zkParam[] = {Integer.TYPE}; - try{ - m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", - zkParam); - } catch (NoSuchMethodException e) { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", - zkParam); - } - - m.invoke(zookeeperCluster, new Object[] {new Integer(zookeeperPort)}); - zookeeperCluster.startup(new File(zookeeperDir)); - hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); - HMaster master = hbaseCluster.getMaster(); - Object serverName = master.getServerName(); - String hostAndPort; - if(serverName instanceof String) { - System.out.println("Server name is string, using HServerAddress."); - m = HMaster.class.getDeclaredMethod("getMasterAddress", - new Class<?>[]{}); - Class<?> clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); - /* - * Call method to get server address - */ - Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); - //returns the address as hostname:port - hostAndPort = serverAddr.toString(); - } else { - System.out.println("ServerName is org.apache.hadoop.hbase.ServerName," + - "using getHostAndPort()"); - Class<?> clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); - m = clazz.getDeclaredMethod("getHostAndPort", new Class<?>[] {}); - hostAndPort = m.invoke(serverName, new Object[]{}).toString(); - } - - hbaseConf.set("hbase.master", hostAndPort); - testUtility = new HBaseTestingUtility(hbaseConf); - testUtility.setZkCluster(zookeeperCluster); - hbaseCluster.startMaster(); Map<String, String> ctxMap = new HashMap<String, String>(); ctxMap.put("table", tableName); ctxMap.put("columnFamily", columnFamily); @@ -151,9 +82,7 @@ public class TestAsyncHBaseSink { @AfterClass public static void tearDown() throws Exception { - hbaseCluster.shutdown(); - zookeeperCluster.shutdown(); - FileUtils.deleteDirectory(new File(workDir)); + testUtility.shutdownMiniCluster(); } @After @@ -347,7 +276,7 @@ public class TestAsyncHBaseSink { deleteTable = true; ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); + ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) ); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); AsyncHBaseSink sink = new AsyncHBaseSink(); @@ -485,7 +414,7 @@ public class TestAsyncHBaseSink { Assert.assertEquals(2, found); out = results[2]; Assert.assertArrayEquals(Longs.toByteArray(2), out); - hbaseCluster.shutdown(); + testUtility.shutdownMiniCluster(); sink.process(); sink.stop(); }
