Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (original) +++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java Thu Sep 12 01:21:10 2013 @@ -46,258 +46,258 @@ import java.util.TreeSet; * made is a TODO for sorting using suffixes and the package name. */ public class WriteLock extends ProtocolSupport { - private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class); + private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class); - private final String dir; - private String id; - private ZNodeName idName; - private String ownerId; - private String lastChildId; - private byte[] data = {0x12, 0x34}; - private LockListener callback; - private LockZooKeeperOperation zop; - - /** - * zookeeper contructor for writelock - * @param zookeeper zookeeper client instance - * @param dir the parent path you want to use for locking - * @param acl the acls that you want to use for all the paths, - * if null world read/write is used. - */ - public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) { - super(zookeeper); - this.dir = dir; - if (acl != null) { - setAcl(acl); + private final String dir; + private String id; + private ZNodeName idName; + private String ownerId; + private String lastChildId; + private byte[] data = {0x12, 0x34}; + private LockListener callback; + private LockZooKeeperOperation zop; + + /** + * zookeeper contructor for writelock + * @param zookeeper zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acl the acls that you want to use for all the paths, + * if null world read/write is used. + */ + public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) { + super(zookeeper); + this.dir = dir; + if (acl != null) { + setAcl(acl); + } + this.zop = new LockZooKeeperOperation(); + } + + /** + * zookeeper contructor for writelock with callback + * @param zookeeper the zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acl the acls that you want to use for all the paths + * @param callback the call back instance + */ + public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, + LockListener callback) { + this(zookeeper, dir, acl); + this.callback = callback; + } + + /** + * return the current locklistener + * @return the locklistener + */ + public LockListener getLockListener() { + return this.callback; + } + + /** + * register a different call back listener + * @param callback the call back instance + */ + public void setLockListener(LockListener callback) { + this.callback = callback; + } + + /** + * Removes the lock or associated znode if + * you no longer require the lock. this also + * removes your request in the queue for locking + * in case you do not already hold the lock. + * @throws RuntimeException throws a runtime exception + * if it cannot connect to zookeeper. + */ + public synchronized void unlock() throws RuntimeException { + + if (!isClosed() && id != null) { + // we don't need to retry this operation in the case of failure + // as ZK will remove ephemeral files and we don't wanna hang + // this process when closing if we cannot reconnect to ZK + try { + + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, + InterruptedException { + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + //set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()). + initCause(e); + } finally { + if (callback != null) { + callback.lockReleased(); } - this.zop = new LockZooKeeperOperation(); - } - - /** - * zookeeper contructor for writelock with callback - * @param zookeeper the zookeeper client instance - * @param dir the parent path you want to use for locking - * @param acl the acls that you want to use for all the paths - * @param callback the call back instance - */ - public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, - LockListener callback) { - this(zookeeper, dir, acl); - this.callback = callback; - } - - /** - * return the current locklistener - * @return the locklistener - */ - public LockListener getLockListener() { - return this.callback; - } - - /** - * register a different call back listener - * @param callback the call back instance - */ - public void setLockListener(LockListener callback) { - this.callback = callback; + id = null; + } } + } - /** - * Removes the lock or associated znode if - * you no longer require the lock. this also - * removes your request in the queue for locking - * in case you do not already hold the lock. - * @throws RuntimeException throws a runtime exception - * if it cannot connect to zookeeper. - */ - public synchronized void unlock() throws RuntimeException { - - if (!isClosed() && id != null) { - // we don't need to retry this operation in the case of failure - // as ZK will remove ephemeral files and we don't wanna hang - // this process when closing if we cannot reconnect to ZK - try { - - ZooKeeperOperation zopdel = new ZooKeeperOperation() { - public boolean execute() throws KeeperException, - InterruptedException { - zookeeper.delete(id, -1); - return Boolean.TRUE; - } - }; - zopdel.execute(); - } catch (InterruptedException e) { - LOG.warn("Caught: " + e, e); - //set that we have been interrupted. - Thread.currentThread().interrupt(); - } catch (KeeperException.NoNodeException e) { - // do nothing - } catch (KeeperException e) { - LOG.warn("Caught: " + e, e); - throw (RuntimeException) new RuntimeException(e.getMessage()). - initCause(e); - } finally { - if (callback != null) { - callback.lockReleased(); - } - id = null; - } + /** + * the watcher called on + * getting watch while watching + * my predecessor + */ + private class LockWatcher implements Watcher { + public void process(WatchedEvent event) { + // lets either become the leader or watch the new/updated node + LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + + event.getState() + " type " + event.getType()); + try { + lock(); + } catch (Exception e) { + LOG.warn("Failed to acquire lock: " + e, e); + } + } + } + + /** + * a zoookeeper operation that is mainly responsible + * for all the magic required for locking. + */ + private class LockZooKeeperOperation implements ZooKeeperOperation { + + /** find if we have been created earler if not create our node + * + * @param prefix the prefix node + * @param zookeeper teh zookeeper client + * @param dir the dir paretn + * @throws KeeperException + * @throws InterruptedException + */ + private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) + throws KeeperException, InterruptedException { + List<String> names = zookeeper.getChildren(dir, false); + for (String name : names) { + if (name.startsWith(prefix)) { + id = name; + if (LOG.isDebugEnabled()) { + LOG.debug("Found id created last time: " + id); + } + break; } - } + } + if (id == null) { + id = zookeeper.create(dir + "/" + prefix, data, + getAcl(), EPHEMERAL_SEQUENTIAL); - /** - * the watcher called on - * getting watch while watching - * my predecessor - */ - private class LockWatcher implements Watcher { - public void process(WatchedEvent event) { - // lets either become the leader or watch the new/updated node - LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + - event.getState() + " type " + event.getType()); - try { - lock(); - } catch (Exception e) { - LOG.warn("Failed to acquire lock: " + e, e); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Created id: " + id); } + } + } /** - * a zoookeeper operation that is mainly responsible - * for all the magic required for locking. + * the command that is run and retried for actually + * obtaining the lock + * @return if the command was successful or not */ - private class LockZooKeeperOperation implements ZooKeeperOperation { - - /** find if we have been created earler if not create our node - * - * @param prefix the prefix node - * @param zookeeper teh zookeeper client - * @param dir the dir paretn - * @throws KeeperException - * @throws InterruptedException - */ - private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) - throws KeeperException, InterruptedException { - List<String> names = zookeeper.getChildren(dir, false); + public boolean execute() throws KeeperException, InterruptedException { + do { + if (id == null) { + long sessionId = zookeeper.getSessionId(); + String prefix = "x-" + sessionId + "-"; + // lets try look up the current ID if we failed + // in the middle of creating the znode + findPrefixInChildren(prefix, zookeeper, dir); + idName = new ZNodeName(id); + } + if (id != null) { + List<String> names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + LOG.warn("No children in: " + dir + " when we've just " + + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + } else { + // lets sort them explicitly (though they do seem to come back in order ususally :) + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); for (String name : names) { - if (name.startsWith(prefix)) { - id = name; - if (LOG.isDebugEnabled()) { - LOG.debug("Found id created last time: " + id); - } - break; - } + //TODO: Just use the suffix to sort. + sortedNames.add(new ZNodeName(dir + "/" + name)); } - if (id == null) { - id = zookeeper.create(dir + "/" + prefix, data, - getAcl(), EPHEMERAL_SEQUENTIAL); - - if (LOG.isDebugEnabled()) { - LOG.debug("Created id: " + id); - } - } - - } - - /** - * the command that is run and retried for actually - * obtaining the lock - * @return if the command was successful or not - */ - public boolean execute() throws KeeperException, InterruptedException { - do { - if (id == null) { - long sessionId = zookeeper.getSessionId(); - String prefix = "x-" + sessionId + "-"; - // lets try look up the current ID if we failed - // in the middle of creating the znode - findPrefixInChildren(prefix, zookeeper, dir); - idName = new ZNodeName(id); - } - if (id != null) { - List<String> names = zookeeper.getChildren(dir, false); - if (names.isEmpty()) { - LOG.warn("No children in: " + dir + " when we've just " + - "created one! Lets recreate it..."); - // lets force the recreation of the id - id = null; - } else { - // lets sort them explicitly (though they do seem to come back in order ususally :) - SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); - for (String name : names) { - //TODO: Just use the suffix to sort. - sortedNames.add(new ZNodeName(dir + "/" + name)); - } - ownerId = sortedNames.first().getName(); - SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); - if (!lessThanMe.isEmpty()) { - ZNodeName lastChildName = lessThanMe.last(); - lastChildId = lastChildName.getName(); - if (LOG.isDebugEnabled()) { - LOG.debug("watching less than me node: " + lastChildId); - } - Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); - if (stat != null) { - return Boolean.FALSE; - } else { - LOG.warn("Could not find the" + - " stats for less than me: " + lastChildName.getName()); - } - } else { - if (isOwner()) { - if (callback != null) { - callback.lockAcquired(); - } - return Boolean.TRUE; - } - } - } + ownerId = sortedNames.first().getName(); + SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + lastChildId = lastChildName.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("watching less than me node: " + lastChildId); + } + Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); + if (stat != null) { + return Boolean.FALSE; + } else { + LOG.warn("Could not find the" + + " stats for less than me: " + lastChildName.getName()); + } + } else { + if (isOwner()) { + if (callback != null) { + callback.lockAcquired(); } + return Boolean.TRUE; + } } - while (id == null); - return Boolean.FALSE; - } - } - - ; - - /** - * Attempts to acquire the exclusive write lock returning whether or not it was - * acquired. Note that the exclusive lock may be acquired some time later after - * this method has been invoked due to the current lock owner going away. - */ - public synchronized boolean lock() throws KeeperException, InterruptedException { - if (isClosed()) { - return false; + } } - ensurePathExists(dir); - - return (Boolean) retryOperation(zop); - } - - /** - * return the parent dir for lock - * @return the parent dir used for locks. - */ - public String getDir() { - return dir; - } - - /** - * Returns true if this node is the owner of the - * lock (or the leader) - */ - public boolean isOwner() { - return id != null && ownerId != null && id.equals(ownerId); - } - - /** - * return the id for this lock - * @return the id for this lock - */ - public String getId() { - return this.id; - } + } + while (id == null); + return Boolean.FALSE; + } + } + + ; + + /** + * Attempts to acquire the exclusive write lock returning whether or not it was + * acquired. Note that the exclusive lock may be acquired some time later after + * this method has been invoked due to the current lock owner going away. + */ + public synchronized boolean lock() throws KeeperException, InterruptedException { + if (isClosed()) { + return false; + } + ensurePathExists(dir); + + return (Boolean) retryOperation(zop); + } + + /** + * return the parent dir for lock + * @return the parent dir used for locks. + */ + public String getDir() { + return dir; + } + + /** + * Returns true if this node is the owner of the + * lock (or the leader) + */ + public boolean isOwner() { + return id != null && ownerId != null && id.equals(ownerId); + } + + /** + * return the id for this lock + * @return the id for this lock + */ + public String getId() { + return this.id; + } }
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (original) +++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java Thu Sep 12 01:21:10 2013 @@ -29,85 +29,85 @@ import org.slf4j.LoggerFactory; * change in package name. */ public class ZNodeName implements Comparable<ZNodeName> { - private final String name; - private String prefix; - private int sequence = -1; - private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class); - - public ZNodeName(String name) { - if (name == null) { - throw new NullPointerException("id cannot be null"); - } - this.name = name; - this.prefix = name; - int idx = name.lastIndexOf('-'); - if (idx >= 0) { - this.prefix = name.substring(0, idx); - try { - this.sequence = Integer.parseInt(name.substring(idx + 1)); - // If an exception occurred we misdetected a sequence suffix, - // so return -1. - } catch (NumberFormatException e) { - LOG.info("Number format exception for " + idx, e); - } catch (ArrayIndexOutOfBoundsException e) { - LOG.info("Array out of bounds for " + idx, e); - } - } - } - - @Override - public String toString() { - return name.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ZNodeName sequence = (ZNodeName) o; - - if (!name.equals(sequence.name)) return false; - - return true; - } - - @Override - public int hashCode() { - return name.hashCode() + 37; - } - - public int compareTo(ZNodeName that) { - int answer = this.prefix.compareTo(that.prefix); - if (answer == 0) { - int s1 = this.sequence; - int s2 = that.sequence; - if (s1 == -1 && s2 == -1) { - return this.name.compareTo(that.name); - } - answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2; - } - return answer; - } - - /** - * Returns the name of the znode - */ - public String getName() { - return name; - } - - /** - * Returns the sequence number - */ - public int getZNodeName() { - return sequence; - } - - /** - * Returns the text prefix before the sequence number - */ - public String getPrefix() { - return prefix; - } + private final String name; + private String prefix; + private int sequence = -1; + private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class); + + public ZNodeName(String name) { + if (name == null) { + throw new NullPointerException("id cannot be null"); + } + this.name = name; + this.prefix = name; + int idx = name.lastIndexOf('-'); + if (idx >= 0) { + this.prefix = name.substring(0, idx); + try { + this.sequence = Integer.parseInt(name.substring(idx + 1)); + // If an exception occurred we misdetected a sequence suffix, + // so return -1. + } catch (NumberFormatException e) { + LOG.info("Number format exception for " + idx, e); + } catch (ArrayIndexOutOfBoundsException e) { + LOG.info("Array out of bounds for " + idx, e); + } + } + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ZNodeName sequence = (ZNodeName) o; + + if (!name.equals(sequence.name)) return false; + + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + 37; + } + + public int compareTo(ZNodeName that) { + int answer = this.prefix.compareTo(that.prefix); + if (answer == 0) { + int s1 = this.sequence; + int s2 = that.sequence; + if (s1 == -1 && s2 == -1) { + return this.name.compareTo(that.name); + } + answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2; + } + return answer; + } + + /** + * Returns the name of the znode + */ + public String getName() { + return name; + } + + /** + * Returns the sequence number + */ + public int getZNodeName() { + return sequence; + } + + /** + * Returns the text prefix before the sequence number + */ + public String getPrefix() { + return prefix; + } } Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (original) +++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java Thu Sep 12 01:21:10 2013 @@ -29,13 +29,13 @@ import org.apache.zookeeper.KeeperExcept */ public interface ZooKeeperOperation { - /** - * Performs the operation - which may be involved multiple times if the connection - * to ZooKeeper closes during this operation - * - * @return the result of the operation or null - * @throws KeeperException - * @throws InterruptedException - */ - public boolean execute() throws KeeperException, InterruptedException; + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * + * @return the result of the operation or null + * @throws KeeperException + * @throws InterruptedException + */ + public boolean execute() throws KeeperException, InterruptedException; } Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (original) +++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Thu Sep 12 01:21:10 2013 @@ -44,326 +44,327 @@ import java.net.ServerSocket; */ public class ManyMiniCluster { - //MR stuff - private boolean miniMRClusterEnabled; - private MiniMRCluster mrCluster; - private int numTaskTrackers; - private JobConf jobConf; + //MR stuff + private boolean miniMRClusterEnabled; + private MiniMRCluster mrCluster; + private int numTaskTrackers; + private JobConf jobConf; + + //HBase stuff + private boolean miniHBaseClusterEnabled; + private MiniHBaseCluster hbaseCluster; + private String hbaseRoot; + private Configuration hbaseConf; + private String hbaseDir; + + //ZK Stuff + private boolean miniZookeeperClusterEnabled; + private MiniZooKeeperCluster zookeeperCluster; + private int zookeeperPort; + private String zookeeperDir; + + //DFS Stuff + private MiniDFSCluster dfsCluster; + + //Hive Stuff + private boolean miniHiveMetastoreEnabled; + private HiveConf hiveConf; + private HiveMetaStoreClient hiveMetaStoreClient; + + private final File workDir; + private boolean started = false; + + + /** + * create a cluster instance using a builder which will expose configurable options + * @param workDir working directory ManyMiniCluster will use for all of it's *Minicluster instances + * @return a Builder instance + */ + public static Builder create(File workDir) { + return new Builder(workDir); + } + + private ManyMiniCluster(Builder b) { + workDir = b.workDir; + numTaskTrackers = b.numTaskTrackers; + hiveConf = b.hiveConf; + jobConf = b.jobConf; + hbaseConf = b.hbaseConf; + miniMRClusterEnabled = b.miniMRClusterEnabled; + miniHBaseClusterEnabled = b.miniHBaseClusterEnabled; + miniHiveMetastoreEnabled = b.miniHiveMetastoreEnabled; + miniZookeeperClusterEnabled = b.miniZookeeperClusterEnabled; + } + + protected synchronized void start() { + try { + if (!started) { + FileUtil.fullyDelete(workDir); + if (miniMRClusterEnabled) { + setupMRCluster(); + } + if (miniZookeeperClusterEnabled || miniHBaseClusterEnabled) { + miniZookeeperClusterEnabled = true; + setupZookeeper(); + } + if (miniHBaseClusterEnabled) { + setupHBaseCluster(); + } + if (miniHiveMetastoreEnabled) { + setUpMetastore(); + } + } + } catch (Exception e) { + throw new IllegalStateException("Failed to setup cluster", e); + } + } + + protected synchronized void stop() { + if (hbaseCluster != null) { + HConnectionManager.deleteAllConnections(true); + try { + hbaseCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + hbaseCluster = null; + } + if (zookeeperCluster != null) { + try { + zookeeperCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + zookeeperCluster = null; + } + if (mrCluster != null) { + try { + mrCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + mrCluster = null; + } + if (dfsCluster != null) { + try { + dfsCluster.getFileSystem().close(); + dfsCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + dfsCluster = null; + } + try { + FileSystem.closeAll(); + } catch (IOException e) { + e.printStackTrace(); + } + started = false; + } + + /** + * @return Configuration of mini HBase cluster + */ + public Configuration getHBaseConf() { + return HBaseConfiguration.create(hbaseConf); + } + + /** + * @return Configuration of mini MR cluster + */ + public Configuration getJobConf() { + return new Configuration(jobConf); + } + + /** + * @return Configuration of Hive Metastore, this is a standalone not a daemon + */ + public HiveConf getHiveConf() { + return new HiveConf(hiveConf); + } + + /** + * @return Filesystem used by MiniMRCluster and MiniHBaseCluster + */ + public FileSystem getFileSystem() { + try { + return FileSystem.get(jobConf); + } catch (IOException e) { + throw new IllegalStateException("Failed to get FileSystem", e); + } + } + + /** + * @return Metastore client instance + */ + public HiveMetaStoreClient getHiveMetaStoreClient() { + return hiveMetaStoreClient; + } + + private void setupMRCluster() { + try { + final int jobTrackerPort = findFreePort(); + final int taskTrackerPort = findFreePort(); + + if (jobConf == null) + jobConf = new JobConf(); + + jobConf.setInt("mapred.submit.replication", 1); + jobConf.set("yarn.scheduler.capacity.root.queues", "default"); + jobConf.set("yarn.scheduler.capacity.root.default.capacity", "100"); + //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history"); + System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); + + mrCluster = new MiniMRCluster(jobTrackerPort, + taskTrackerPort, + numTaskTrackers, + getFileSystem().getUri().toString(), + numTaskTrackers, + null, + null, + null, + jobConf); + + jobConf = mrCluster.createJobConf(); + } catch (IOException e) { + throw new IllegalStateException("Failed to Setup MR Cluster", e); + } + } + + private void setupZookeeper() { + try { + zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + zookeeperPort = findFreePort(); + zookeeperCluster = new MiniZooKeeperCluster(); + zookeeperCluster.setDefaultClientPort(zookeeperPort); + zookeeperCluster.startup(new File(zookeeperDir)); + } catch (Exception e) { + throw new IllegalStateException("Failed to Setup Zookeeper Cluster", e); + } + } + + private void setupHBaseCluster() { + final int numRegionServers = 1; + + try { + hbaseDir = new File(workDir, "hbase").toString(); + hbaseDir = hbaseDir.replaceAll("\\\\", "/"); + hbaseRoot = "file://" + hbaseDir; + + if (hbaseConf == null) + hbaseConf = HBaseConfiguration.create(); + + hbaseConf.set("hbase.rootdir", hbaseRoot); + hbaseConf.set("hbase.master", "local"); + hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); + hbaseConf.setInt("hbase.master.port", findFreePort()); + hbaseConf.setInt("hbase.master.info.port", -1); + hbaseConf.setInt("hbase.regionserver.port", findFreePort()); + hbaseConf.setInt("hbase.regionserver.info.port", -1); + + hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers); + hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); + //opening the META table ensures that cluster is running + new HTable(hbaseConf, HConstants.META_TABLE_NAME); + } catch (Exception e) { + throw new IllegalStateException("Failed to setup HBase Cluster", e); + } + } + + private void setUpMetastore() throws Exception { + if (hiveConf == null) + hiveConf = new HiveConf(this.getClass()); + + //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook + //is present only in the ql/test directory + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, + "jdbc:derby:" + new File(workDir + "/metastore_db") + ";create=true"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), + new File(workDir, "warehouse").toString()); + //set where derby logs + File derbyLogFile = new File(workDir + "/derby.log"); + derbyLogFile.createNewFile(); + System.setProperty("derby.stream.error.file", derbyLogFile.getPath()); - //HBase stuff - private boolean miniHBaseClusterEnabled; - private MiniHBaseCluster hbaseCluster; - private String hbaseRoot; - private Configuration hbaseConf; - private String hbaseDir; - //ZK Stuff - private boolean miniZookeeperClusterEnabled; - private MiniZooKeeperCluster zookeeperCluster; - private int zookeeperPort; - private String zookeeperDir; +// Driver driver = new Driver(hiveConf); +// SessionState.start(new CliSessionState(hiveConf)); - //DFS Stuff - private MiniDFSCluster dfsCluster; + hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); + } - //Hive Stuff - private boolean miniHiveMetastoreEnabled; + private static int findFreePort() throws IOException { + ServerSocket server = new ServerSocket(0); + int port = server.getLocalPort(); + server.close(); + return port; + } + + public static class Builder { + private File workDir; + private int numTaskTrackers = 1; + private JobConf jobConf; + private Configuration hbaseConf; private HiveConf hiveConf; - private HiveMetaStoreClient hiveMetaStoreClient; - private final File workDir; - private boolean started = false; + private boolean miniMRClusterEnabled = true; + private boolean miniHBaseClusterEnabled = true; + private boolean miniHiveMetastoreEnabled = true; + private boolean miniZookeeperClusterEnabled = true; - /** - * create a cluster instance using a builder which will expose configurable options - * @param workDir working directory ManyMiniCluster will use for all of it's *Minicluster instances - * @return a Builder instance - */ - public static Builder create(File workDir) { - return new Builder(workDir); - } - - private ManyMiniCluster(Builder b) { - workDir = b.workDir; - numTaskTrackers = b.numTaskTrackers; - hiveConf = b.hiveConf; - jobConf = b.jobConf; - hbaseConf = b.hbaseConf; - miniMRClusterEnabled = b.miniMRClusterEnabled; - miniHBaseClusterEnabled = b.miniHBaseClusterEnabled; - miniHiveMetastoreEnabled = b.miniHiveMetastoreEnabled; - miniZookeeperClusterEnabled = b.miniZookeeperClusterEnabled; - } - - protected synchronized void start() { - try { - if (!started) { - FileUtil.fullyDelete(workDir); - if (miniMRClusterEnabled) { - setupMRCluster(); - } - if (miniZookeeperClusterEnabled || miniHBaseClusterEnabled) { - miniZookeeperClusterEnabled = true; - setupZookeeper(); - } - if (miniHBaseClusterEnabled) { - setupHBaseCluster(); - } - if (miniHiveMetastoreEnabled) { - setUpMetastore(); - } - } - } catch (Exception e) { - throw new IllegalStateException("Failed to setup cluster", e); - } + private Builder(File workDir) { + this.workDir = workDir; } - protected synchronized void stop() { - if (hbaseCluster != null) { - HConnectionManager.deleteAllConnections(true); - try { - hbaseCluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - hbaseCluster = null; - } - if (zookeeperCluster != null) { - try { - zookeeperCluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - zookeeperCluster = null; - } - if (mrCluster != null) { - try { - mrCluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - mrCluster = null; - } - if (dfsCluster != null) { - try { - dfsCluster.getFileSystem().close(); - dfsCluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - dfsCluster = null; - } - try { - FileSystem.closeAll(); - } catch (IOException e) { - e.printStackTrace(); - } - started = false; + public Builder numTaskTrackers(int num) { + numTaskTrackers = num; + return this; } - /** - * @return Configuration of mini HBase cluster - */ - public Configuration getHBaseConf() { - return HBaseConfiguration.create(hbaseConf); - } - - /** - * @return Configuration of mini MR cluster - */ - public Configuration getJobConf() { - return new Configuration(jobConf); - } - - /** - * @return Configuration of Hive Metastore, this is a standalone not a daemon - */ - public HiveConf getHiveConf() { - return new HiveConf(hiveConf); - } - - /** - * @return Filesystem used by MiniMRCluster and MiniHBaseCluster - */ - public FileSystem getFileSystem() { - try { - return FileSystem.get(jobConf); - } catch (IOException e) { - throw new IllegalStateException("Failed to get FileSystem", e); - } + public Builder jobConf(JobConf jobConf) { + this.jobConf = jobConf; + return this; } - /** - * @return Metastore client instance - */ - public HiveMetaStoreClient getHiveMetaStoreClient() { - return hiveMetaStoreClient; - } - - private void setupMRCluster() { - try { - final int jobTrackerPort = findFreePort(); - final int taskTrackerPort = findFreePort(); - - if (jobConf == null) - jobConf = new JobConf(); - - jobConf.setInt("mapred.submit.replication", 1); - jobConf.set("yarn.scheduler.capacity.root.queues", "default"); - jobConf.set("yarn.scheduler.capacity.root.default.capacity", "100"); - //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history"); - System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); - - mrCluster = new MiniMRCluster(jobTrackerPort, - taskTrackerPort, - numTaskTrackers, - getFileSystem().getUri().toString(), - numTaskTrackers, - null, - null, - null, - jobConf); - - jobConf = mrCluster.createJobConf(); - } catch (IOException e) { - throw new IllegalStateException("Failed to Setup MR Cluster", e); - } + public Builder hbaseConf(Configuration hbaseConf) { + this.hbaseConf = hbaseConf; + return this; } - private void setupZookeeper() { - try { - zookeeperDir = new File(workDir, "zk").getAbsolutePath(); - zookeeperPort = findFreePort(); - zookeeperCluster = new MiniZooKeeperCluster(); - zookeeperCluster.setDefaultClientPort(zookeeperPort); - zookeeperCluster.startup(new File(zookeeperDir)); - } catch (Exception e) { - throw new IllegalStateException("Failed to Setup Zookeeper Cluster", e); - } + public Builder hiveConf(HiveConf hiveConf) { + this.hiveConf = hiveConf; + return this; } - private void setupHBaseCluster() { - final int numRegionServers = 1; - - try { - hbaseDir = new File(workDir, "hbase").getAbsolutePath(); - hbaseRoot = "file://" + hbaseDir; - - if (hbaseConf == null) - hbaseConf = HBaseConfiguration.create(); - - hbaseConf.set("hbase.rootdir", hbaseRoot); - hbaseConf.set("hbase.master", "local"); - hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort); - hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); - hbaseConf.setInt("hbase.master.port", findFreePort()); - hbaseConf.setInt("hbase.master.info.port", -1); - hbaseConf.setInt("hbase.regionserver.port", findFreePort()); - hbaseConf.setInt("hbase.regionserver.info.port", -1); - - hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers); - hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); - //opening the META table ensures that cluster is running - new HTable(hbaseConf, HConstants.META_TABLE_NAME); - } catch (Exception e) { - throw new IllegalStateException("Failed to setup HBase Cluster", e); - } + public Builder miniMRClusterEnabled(boolean enabled) { + this.miniMRClusterEnabled = enabled; + return this; } - private void setUpMetastore() throws Exception { - if (hiveConf == null) - hiveConf = new HiveConf(this.getClass()); - - //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook - //is present only in the ql/test directory - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, - "jdbc:derby:" + new File(workDir + "/metastore_db") + ";create=true"); - hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), - new File(workDir, "warehouse").toString()); - //set where derby logs - File derbyLogFile = new File(workDir + "/derby.log"); - derbyLogFile.createNewFile(); - System.setProperty("derby.stream.error.file", derbyLogFile.getPath()); - - -// Driver driver = new Driver(hiveConf); -// SessionState.start(new CliSessionState(hiveConf)); - - hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); + public Builder miniHBaseClusterEnabled(boolean enabled) { + this.miniHBaseClusterEnabled = enabled; + return this; } - private static int findFreePort() throws IOException { - ServerSocket server = new ServerSocket(0); - int port = server.getLocalPort(); - server.close(); - return port; + public Builder miniZookeeperClusterEnabled(boolean enabled) { + this.miniZookeeperClusterEnabled = enabled; + return this; } - public static class Builder { - private File workDir; - private int numTaskTrackers = 1; - private JobConf jobConf; - private Configuration hbaseConf; - private HiveConf hiveConf; - - private boolean miniMRClusterEnabled = true; - private boolean miniHBaseClusterEnabled = true; - private boolean miniHiveMetastoreEnabled = true; - private boolean miniZookeeperClusterEnabled = true; - - - private Builder(File workDir) { - this.workDir = workDir; - } - - public Builder numTaskTrackers(int num) { - numTaskTrackers = num; - return this; - } - - public Builder jobConf(JobConf jobConf) { - this.jobConf = jobConf; - return this; - } - - public Builder hbaseConf(Configuration hbaseConf) { - this.hbaseConf = hbaseConf; - return this; - } - - public Builder hiveConf(HiveConf hiveConf) { - this.hiveConf = hiveConf; - return this; - } - - public Builder miniMRClusterEnabled(boolean enabled) { - this.miniMRClusterEnabled = enabled; - return this; - } - - public Builder miniHBaseClusterEnabled(boolean enabled) { - this.miniHBaseClusterEnabled = enabled; - return this; - } - - public Builder miniZookeeperClusterEnabled(boolean enabled) { - this.miniZookeeperClusterEnabled = enabled; - return this; - } - - public Builder miniHiveMetastoreEnabled(boolean enabled) { - this.miniHiveMetastoreEnabled = enabled; - return this; - } + public Builder miniHiveMetastoreEnabled(boolean enabled) { + this.miniHiveMetastoreEnabled = enabled; + return this; + } - public ManyMiniCluster build() { - return new ManyMiniCluster(this); - } - + public ManyMiniCluster build() { + return new ManyMiniCluster(this); } + + } } Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java (original) +++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Thu Sep 12 01:21:10 2013 @@ -43,199 +43,195 @@ import org.junit.BeforeClass; */ public abstract class SkeletonHBaseTest { - protected static String TEST_DIR = System.getProperty("test.data.dir", "./"); + protected static String TEST_DIR = "/tmp/build/test/data/"; - protected final static String DEFAULT_CONTEXT_HANDLE = "default"; + protected final static String DEFAULT_CONTEXT_HANDLE = "default"; - protected static Map<String, Context> contextMap = new HashMap<String, Context>(); - protected static Set<String> tableNames = new HashSet<String>(); + protected static Map<String, Context> contextMap = new HashMap<String, Context>(); + protected static Set<String> tableNames = new HashSet<String>(); - /** - * Allow tests to alter the default MiniCluster configuration. - * (requires static initializer block as all setup here is static) - */ - protected static Configuration testConf = null; + /** + * Allow tests to alter the default MiniCluster configuration. + * (requires static initializer block as all setup here is static) + */ + protected static Configuration testConf = null; + + protected void createTable(String tableName, String[] families) { + try { + HBaseAdmin admin = new HBaseAdmin(getHbaseConf()); + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + for (String family : families) { + HColumnDescriptor columnDescriptor = new HColumnDescriptor(family); + tableDesc.addFamily(columnDescriptor); + } + admin.createTable(tableDesc); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e); + } + + } + + protected String newTableName(String prefix) { + String name = null; + int tries = 100; + do { + name = prefix + "_" + Math.abs(new Random().nextLong()); + } while (tableNames.contains(name) && --tries > 0); + if (tableNames.contains(name)) + throw new IllegalStateException("Couldn't find a unique table name, tableNames size: " + tableNames.size()); + tableNames.add(name); + return name; + } + + + /** + * startup an hbase cluster instance before a test suite runs + */ + @BeforeClass + public static void setup() { + if (!contextMap.containsKey(getContextHandle())) + contextMap.put(getContextHandle(), new Context(getContextHandle())); + + contextMap.get(getContextHandle()).start(); + } + + /** + * shutdown an hbase cluster instance ant the end of the test suite + */ + @AfterClass + public static void tearDown() { + contextMap.get(getContextHandle()).stop(); + } + + /** + * override this with a different context handle if tests suites are run simultaneously + * and ManyMiniCluster instances shouldn't be shared + * @return + */ + public static String getContextHandle() { + return DEFAULT_CONTEXT_HANDLE; + } + + /** + * @return working directory for a given test context, which normally is a test suite + */ + public String getTestDir() { + return contextMap.get(getContextHandle()).getTestDir(); + } + + /** + * @return ManyMiniCluster instance + */ + public ManyMiniCluster getCluster() { + return contextMap.get(getContextHandle()).getCluster(); + } + + /** + * @return configuration of MiniHBaseCluster + */ + public Configuration getHbaseConf() { + return contextMap.get(getContextHandle()).getHbaseConf(); + } + + /** + * @return configuration of MiniMRCluster + */ + public Configuration getJobConf() { + return contextMap.get(getContextHandle()).getJobConf(); + } + + /** + * @return configuration of Hive Metastore + */ + public HiveConf getHiveConf() { + return contextMap.get(getContextHandle()).getHiveConf(); + } + + /** + * @return filesystem used by ManyMiniCluster daemons + */ + public FileSystem getFileSystem() { + return contextMap.get(getContextHandle()).getFileSystem(); + } + + /** + * class used to encapsulate a context which is normally used by + * a single TestSuite or across TestSuites when multi-threaded testing is turned on + */ + public static class Context { + protected String testDir; + protected ManyMiniCluster cluster; + + protected Configuration hbaseConf; + protected Configuration jobConf; + protected HiveConf hiveConf; + + protected FileSystem fileSystem; + + protected int usageCount = 0; + + public Context(String handle) { + testDir = new File(TEST_DIR + "/test_" + handle + "_" + Math.abs(new Random().nextLong()) + "/").getPath(); + System.out.println("Cluster work directory: " + testDir); + } + + public void start() { + if (usageCount++ == 0) { + ManyMiniCluster.Builder b = ManyMiniCluster.create(new File(testDir)); + if (testConf != null) { + b.hbaseConf(HBaseConfiguration.create(testConf)); + } + cluster = b.build(); + cluster.start(); + this.hbaseConf = cluster.getHBaseConf(); + jobConf = cluster.getJobConf(); + fileSystem = cluster.getFileSystem(); + hiveConf = cluster.getHiveConf(); + } + } - protected void createTable(String tableName, String[] families) { + public void stop() { + if (--usageCount == 0) { try { - HBaseAdmin admin = new HBaseAdmin(getHbaseConf()); - HTableDescriptor tableDesc = new HTableDescriptor(tableName); - for (String family : families) { - HColumnDescriptor columnDescriptor = new HColumnDescriptor(family); - tableDesc.addFamily(columnDescriptor); - } - admin.createTable(tableDesc); - } catch (Exception e) { - e.printStackTrace(); - throw new IllegalStateException(e); - } - - } + cluster.stop(); + cluster = null; + } finally { + System.out.println("Trying to cleanup: " + testDir); + try { + FileSystem fs = FileSystem.get(jobConf); + fs.delete(new Path(testDir), true); + } catch (IOException e) { + throw new IllegalStateException("Failed to cleanup test dir", e); + } - protected String newTableName(String prefix) { - String name = null; - int tries = 100; - do { - name = prefix + "_" + Math.abs(new Random().nextLong()); - } while (tableNames.contains(name) && --tries > 0); - if (tableNames.contains(name)) - throw new IllegalStateException("Couldn't find a unique table name, tableNames size: " + tableNames.size()); - tableNames.add(name); - return name; + } + } } - - /** - * startup an hbase cluster instance before a test suite runs - */ - @BeforeClass - public static void setup() { - if (!contextMap.containsKey(getContextHandle())) - contextMap.put(getContextHandle(), new Context(getContextHandle())); - - contextMap.get(getContextHandle()).start(); - } - - /** - * shutdown an hbase cluster instance ant the end of the test suite - */ - @AfterClass - public static void tearDown() { - contextMap.get(getContextHandle()).stop(); - } - - /** - * override this with a different context handle if tests suites are run simultaneously - * and ManyMiniCluster instances shouldn't be shared - * @return - */ - public static String getContextHandle() { - return DEFAULT_CONTEXT_HANDLE; - } - - /** - * @return working directory for a given test context, which normally is a test suite - */ public String getTestDir() { - return contextMap.get(getContextHandle()).getTestDir(); + return testDir; } - /** - * @return ManyMiniCluster instance - */ public ManyMiniCluster getCluster() { - return contextMap.get(getContextHandle()).getCluster(); + return cluster; } - /** - * @return configuration of MiniHBaseCluster - */ public Configuration getHbaseConf() { - return contextMap.get(getContextHandle()).getHbaseConf(); + return hbaseConf; } - /** - * @return configuration of MiniMRCluster - */ public Configuration getJobConf() { - return contextMap.get(getContextHandle()).getJobConf(); + return jobConf; } - /** - * @return configuration of Hive Metastore - */ public HiveConf getHiveConf() { - return contextMap.get(getContextHandle()).getHiveConf(); + return hiveConf; } - /** - * @return filesystem used by ManyMiniCluster daemons - */ public FileSystem getFileSystem() { - return contextMap.get(getContextHandle()).getFileSystem(); - } - - /** - * class used to encapsulate a context which is normally used by - * a single TestSuite or across TestSuites when multi-threaded testing is turned on - */ - public static class Context { - protected String testDir; - protected ManyMiniCluster cluster; - - protected Configuration hbaseConf; - protected Configuration jobConf; - protected HiveConf hiveConf; - - protected FileSystem fileSystem; - - protected int usageCount = 0; - - public Context(String handle) { - try { - testDir = new File(TEST_DIR + "/test_" + handle + "_" + Math.abs(new Random().nextLong()) + "/").getCanonicalPath(); - } catch (IOException e) { - throw new IllegalStateException("Failed to generate testDir", e); - } - System.out.println("Cluster work directory: " + testDir); - } - - public void start() { - if (usageCount++ == 0) { - ManyMiniCluster.Builder b = ManyMiniCluster.create(new File(testDir)); - if (testConf != null) { - b.hbaseConf(HBaseConfiguration.create(testConf)); - } - cluster = b.build(); - cluster.start(); - this.hbaseConf = cluster.getHBaseConf(); - jobConf = cluster.getJobConf(); - fileSystem = cluster.getFileSystem(); - hiveConf = cluster.getHiveConf(); - } - } - - public void stop() { - if (--usageCount == 0) { - try { - cluster.stop(); - cluster = null; - } finally { - System.out.println("Trying to cleanup: " + testDir); - try { - FileSystem fs = FileSystem.get(jobConf); - fs.delete(new Path(testDir), true); - } catch (IOException e) { - throw new IllegalStateException("Failed to cleanup test dir", e); - } - - } - } - } - - public String getTestDir() { - return testDir; - } - - public ManyMiniCluster getCluster() { - return cluster; - } - - public Configuration getHbaseConf() { - return hbaseConf; - } - - public Configuration getJobConf() { - return jobConf; - } - - public HiveConf getHiveConf() { - return hiveConf; - } - - public FileSystem getFileSystem() { - return fileSystem; - } + return fileSystem; } + } }
