Repository: hbase Updated Branches: refs/heads/hbase-11339 1b800f7d4 -> fbbb3249d
HBASE-12820 Use table lock instead of MobZookeeper.(Jingcheng Du) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fbbb3249 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fbbb3249 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fbbb3249 Branch: refs/heads/hbase-11339 Commit: fbbb3249d9ef6aa5bf12ca2abbf67f4a89c86c18 Parents: 1b800f7 Author: anoopsjohn <[email protected]> Authored: Sat Jan 24 11:37:01 2015 +0530 Committer: anoopsjohn <[email protected]> Committed: Sat Jan 24 11:37:01 2015 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hbase/mob/MobConstants.java | 1 + .../org/apache/hadoop/hbase/mob/MobUtils.java | 21 +- .../apache/hadoop/hbase/mob/MobZookeeper.java | 270 ------------------- .../hadoop/hbase/mob/mapreduce/SweepJob.java | 126 +++++---- .../mob/mapreduce/SweepJobNodeTracker.java | 56 ++-- .../hadoop/hbase/mob/mapreduce/SweepMapper.java | 7 +- .../hbase/mob/mapreduce/SweepReducer.java | 13 +- .../hadoop/hbase/regionserver/HMobStore.java | 87 +++--- .../hbase/mob/mapreduce/TestMobSweepMapper.java | 78 ++++-- .../mob/mapreduce/TestMobSweepReducer.java | 77 +++--- .../hbase/mob/mapreduce/TestMobSweeper.java | 4 +- .../hbase/regionserver/TestMobCompaction.java | 56 ---- 12 files changed, 276 insertions(+), 520 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index 7b0f9a0..f40c952 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -72,6 +72,7 @@ public class MobConstants { public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l; public final static String TEMP_DIR_NAME = ".tmp"; + public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock"); public final static String EMPTY_STRING = ""; private MobConstants() { http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index d0bb3ec..43521d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -274,16 +274,6 @@ public class MobUtils { } /** - * Gets the znode name of column family. - * @param tableName The current table name. - * @param familyName The name of the current column family. - * @return The znode name of column family. - */ - public static String getColumnFamilyZNodeName(String tableName, String familyName) { - return tableName + ":" + familyName; - } - - /** * Gets the root dir of the mob files. * It's {HBASE_DIR}/mobdir. * @param conf The current configuration. @@ -548,4 +538,15 @@ public class MobUtils { return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT, cell.getValueLength() - Bytes.SIZEOF_INT); } + + /** + * Gets the table name used in the table lock. + * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock". + * @param tn The table name. + * @return The table name used in table lock. + */ + public static TableName getTableLockName(TableName tn) { + byte[] tableName = tn.getName(); + return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX)); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java deleted file mode 100644 index a9557d7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -/** - * The zookeeper used for MOB. - * This zookeeper is used to synchronize the HBase major compaction and sweep tool. - * The structure of the nodes for mob in zookeeper. - * |--baseNode - * |--MOB - * |--tableName:columnFamilyName-lock // locks for the mob column family - * |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added - * |--tableName:columnFamilyName-majorCompaction - * |--UUID //when a major compaction occurs, such a node is added. - * In order to synchronize the operations between the sweep tool and HBase major compaction, these - * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major - * compaction run. - * In sweep tool. - * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the - * current running is aborted. If not it it checks whether there're major compaction nodes, if yes - * the current running is aborted, if not it adds a sweep node to the zookeeper. - * 2. If it could not obtain the lock, the current running is aborted. - * In the HBase compaction. - * 1. If it's a minor compaction, continue the compaction. - * 2. If it's a major compaction, it acquires a lock in zookeeper. - * A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself - * to a minor one and continue, if no it adds a major compaction node to the zookeeper. - * B. If it could not obtain the lock, it converts itself to a minor one and continue the - * compaction. - */ [email protected] -public class MobZookeeper { - // TODO Will remove this class before the mob is merged back to master. - private static final Log LOG = LogFactory.getLog(MobZookeeper.class); - - private ZooKeeperWatcher zkw; - private String mobZnode; - private static final String LOCK_EPHEMERAL = "-lock"; - private static final String SWEEPER_EPHEMERAL = "-sweeper"; - private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction"; - - private MobZookeeper(Configuration conf, String identifier) throws IOException, - KeeperException { - this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable()); - mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB"); - if (ZKUtil.checkExists(zkw, mobZnode) == -1) { - ZKUtil.createWithParents(zkw, mobZnode); - } - } - - /** - * Creates an new instance of MobZookeeper. - * @param conf The current configuration. - * @param identifier string that is passed to RecoverableZookeeper to be used as - * identifier for this instance. - * @return A new instance of MobZookeeper. - * @throws IOException - * @throws KeeperException - */ - public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException, - KeeperException { - return new MobZookeeper(conf, identifier); - } - - /** - * Acquire a lock on the current column family. - * All the threads try to access the column family acquire a lock which is actually create an - * ephemeral node in the zookeeper. - * @param tableName The current table name. - * @param familyName The current column family name. - * @return True if the lock is obtained successfully. Otherwise false is returned. - */ - public boolean lockColumnFamily(String tableName, String familyName) { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - boolean locked = false; - try { - locked = ZKUtil.createEphemeralNodeAndWatch(zkw, - ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null); - if (LOG.isDebugEnabled()) { - LOG.debug(locked ? "Locked the column family " + znodeName - : "Can not lock the column family " + znodeName); - } - } catch (KeeperException e) { - LOG.error("Fail to lock the column family " + znodeName, e); - } - return locked; - } - - /** - * Release the lock on the current column family. - * @param tableName The current table name. - * @param familyName The current column family name. - */ - public void unlockColumnFamily(String tableName, String familyName) { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - if (LOG.isDebugEnabled()) { - LOG.debug("Unlocking the column family " + znodeName); - } - try { - ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL)); - } catch (KeeperException e) { - LOG.warn("Fail to unlock the column family " + znodeName, e); - } - } - - /** - * Adds a node to zookeeper which indicates that a sweep tool is running. - * @param tableName The current table name. - * @param familyName The current columnFamilyName name. - * @param data the data of the ephemeral node. - * @return True if the node is created successfully. Otherwise false is returned. - */ - public boolean addSweeperZNode(String tableName, String familyName, byte[] data) { - boolean add = false; - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - try { - add = ZKUtil.createEphemeralNodeAndWatch(zkw, - ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data); - if (LOG.isDebugEnabled()) { - LOG.debug(add ? "Added a znode for sweeper " + znodeName - : "Cannot add a znode for sweeper " + znodeName); - } - } catch (KeeperException e) { - LOG.error("Fail to add a znode for sweeper " + znodeName, e); - } - return add; - } - - /** - * Gets the path of the sweeper znode in zookeeper. - * @param tableName The current table name. - * @param familyName The current columnFamilyName name. - * @return The path of the sweeper znode in zookeper. - */ - public String getSweeperZNodePath(String tableName, String familyName) { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL); - } - - /** - * Deletes the node from zookeeper which indicates that a sweep tool is finished. - * @param tableName The current table name. - * @param familyName The current column family name. - */ - public void deleteSweeperZNode(String tableName, String familyName) { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - try { - ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)); - } catch (KeeperException e) { - LOG.error("Fail to delete a znode for sweeper " + znodeName, e); - } - } - - /** - * Checks whether the znode exists in the Zookeeper. - * If the node exists, it means a sweep tool is running. - * Otherwise, the sweep tool is not. - * @param tableName The current table name. - * @param familyName The current column family name. - * @return True if this node doesn't exist. Otherwise false is returned. - * @throws KeeperException - */ - public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0; - } - - /** - * Checks whether there're major compactions nodes in the zookeeper. - * If there're such nodes, it means there're major compactions in progress now. - * Otherwise there're not. - * @param tableName The current table name. - * @param familyName The current column family name. - * @return True if there're major compactions in progress. Otherwise false is returned. - * @throws KeeperException - */ - public boolean hasMajorCompactionChildren(String tableName, String familyName) - throws KeeperException { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); - List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath); - return children != null && !children.isEmpty(); - } - - /** - * Creates a node of a major compaction to the Zookeeper. - * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that - * there're major compaction in progress, the sweep tool could not be run at this time. - * @param tableName The current table name. - * @param familyName The current column family name. - * @param compactionName The current compaction name. - * @return True if the node is created successfully. Otherwise false is returned. - * @throws KeeperException - */ - public boolean addMajorCompactionZNode(String tableName, String familyName, - String compactionName) throws KeeperException { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); - ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null); - String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName); - return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null); - } - - /** - * Deletes a major compaction node from the Zookeeper. - * @param tableName The current table name. - * @param familyName The current column family name. - * @param compactionName The current compaction name. - * @throws KeeperException - */ - public void deleteMajorCompactionZNode(String tableName, String familyName, - String compactionName) throws KeeperException { - String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName); - String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL); - String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName); - ZKUtil.deleteNode(zkw, eachMcPath); - } - - /** - * Closes the MobZookeeper. - */ - public void close() { - this.zkw.close(); - } - - /** - * An dummy abortable. It's used for the MobZookeeper. - */ - public static class DummyMobAbortable implements Abortable { - - private boolean abort = false; - - public void abort(String why, Throwable e) { - abort = true; - } - - public boolean isAborted() { - return abort; - } - - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java index 8caa3b0..1c8bad7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob.mapreduce; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.PriorityQueue; @@ -37,32 +38,39 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.MobZookeeper; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.serializer.JavaSerialization; import org.apache.hadoop.io.serializer.WritableSerialization; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.net.DNS; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.KeeperException; @@ -77,21 +85,23 @@ public class SweepJob { private final FileSystem fs; private final Configuration conf; private static final Log LOG = LogFactory.getLog(SweepJob.class); - static final String SWEEP_JOB_ID = "mob.compaction.id"; - static final String SWEEPER_NODE = "mob.compaction.sweep.node"; - static final String WORKING_DIR_KEY = "mob.compaction.dir"; - static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file"; - static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir"; + static final String SWEEP_JOB_ID = "mob.sweep.job.id"; + static final String SWEEP_JOB_SERVERNAME = "mob.sweep.job.servername"; + static final String SWEEP_JOB_TABLE_NODE = "mob.sweep.job.table.node"; + static final String WORKING_DIR_KEY = "mob.sweep.job.dir"; + static final String WORKING_ALLNAMES_FILE_KEY = "mob.sweep.job.all.file"; + static final String WORKING_VISITED_DIR_KEY = "mob.sweep.job.visited.dir"; static final String WORKING_ALLNAMES_DIR = "all"; static final String WORKING_VISITED_DIR = "visited"; - public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir"; - //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing. - public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay"; + public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir"; + //the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. + public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay"; protected static long ONE_DAY = 24 * 60 * 60 * 1000; private long compactionStartTime = EnvironmentEdgeManager.currentTime(); public final static String CREDENTIALS_LOCATION = "credentials_location"; private CacheConfig cacheConfig; static final int SCAN_CACHING = 10000; + private TableLockManager tableLockManager; public SweepJob(Configuration conf, FileSystem fs) { this.conf = conf; @@ -102,6 +112,22 @@ public class SweepJob { cacheConfig = new CacheConfig(copyOfConf); } + static ServerName getCurrentServerName(Configuration conf) throws IOException { + String hostname = conf.get( + "hbase.regionserver.ipc.address", + Strings.domainNamePointerToHostName(DNS.getDefaultHost( + conf.get("hbase.regionserver.dns.interface", "default"), + conf.get("hbase.regionserver.dns.nameserver", "default")))); + int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); + // Creation of a HSA will force a resolve. + InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); + if (initialIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initialIsa); + } + return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(), + EnvironmentEdgeManager.currentTime()); + } + /** * Runs MapReduce to do the sweeping on the mob files. * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob @@ -141,37 +167,21 @@ public class SweepJob { } String familyName = family.getNameAsString(); String id = "SweepJob" + UUID.randomUUID().toString().replace("-", ""); - MobZookeeper zk = MobZookeeper.newInstance(conf, id); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable()); try { - // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion - // in the Zookeeper. - if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) { - LOG.warn("Can not lock the store " + familyName - + ". The major compaction in HBase may be in-progress. Please re-run the job."); - return 3; - } + ServerName serverName = getCurrentServerName(conf); + tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName); + TableName lockName = MobUtils.getTableLockName(tn); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + String tableName = tn.getNameAsString(); + // Try to obtain the lock. Use this lock to synchronize all the query try { - // Checks whether there're HBase major compaction now. - boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName); - if (hasChildren) { - LOG.warn("The major compaction in HBase may be in-progress." - + " Please re-run the job."); - return 4; - } else { - // Checks whether there's sweep tool in progress. - boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName); - if (hasSweeper) { - LOG.warn("Another sweep job is running"); - return 5; - } else { - // add the sweeper node, mark that there's one sweep tool in progress. - // All the HBase major compaction and sweep tool in this column family could not - // run until this sweep tool is finished. - zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id)); - } - } - } finally { - zk.unlockColumnFamily(tn.getNameAsString(), familyName); + lock.acquire(); + } catch (Exception e) { + LOG.warn("Can not lock the table " + tableName + + ". The major compaction in HBase may be in-progress or another sweep job is running." + + " Please re-run the job."); + return 3; } Job job = null; try { @@ -186,7 +196,9 @@ public class SweepJob { conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); conf.set(SWEEP_JOB_ID, id); - conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName)); + conf.set(SWEEP_JOB_SERVERNAME, serverName.toString()); + String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); + conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode); job = prepareJob(tn, familyName, scan, conf); job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName); // Record the compaction start time. @@ -204,14 +216,21 @@ public class SweepJob { removeUnusedFiles(job, tn, family); } else { System.err.println("Job Failed"); - return 6; + return 4; } } finally { - cleanup(job, tn, familyName); - zk.deleteSweeperZNode(tn.getNameAsString(), familyName); + try { + cleanup(job, tn, familyName); + } finally { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Fail to release the table lock " + tableName, e); + } + } } } finally { - zk.close(); + zkw.close(); } return 0; } @@ -305,7 +324,7 @@ public class SweepJob { // archive them. FileStatus[] files = fs.listStatus(mobStorePath); Set<String> fileNames = new TreeSet<String>(); - long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY); + long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY); for (FileStatus fileStatus : files) { if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) { if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) { @@ -422,9 +441,8 @@ public class SweepJob { * Deletes the working directory. * @param job The current job. * @param familyName The family to cleanup - * @throws IOException */ - private void cleanup(Job job, TableName tn, String familyName) throws IOException { + private void cleanup(Job job, TableName tn, String familyName) { if (job != null) { // delete the working directory Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY)); @@ -563,4 +581,18 @@ public class SweepJob { */ RECORDS_UPDATED, } + + public static class DummyMobAbortable implements Abortable { + + private boolean abort = false; + + public void abort(String why, Throwable e) { + abort = true; + } + + public boolean isAborted() { + return abort; + } + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java index b789332..7844359 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java @@ -18,8 +18,14 @@ */ package org.apache.hadoop.hbase.mob.mapreduce; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -36,38 +42,58 @@ import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private public class SweepJobNodeTracker extends ZooKeeperListener { - private String node; - private String sweepJobId; + private String parentNode; + private String lockNodePrefix; + private String owner; + private String lockNode; - public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) { + public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) { super(watcher); - this.node = node; - this.sweepJobId = sweepJobId; + this.parentNode = parentNode; + this.owner = owner; + this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-"); } /** * Registers the watcher on the sweep job node. * If there's no such a sweep job node, or it's not created by the sweep job that * owns the current MR, the current process will be aborted. + * This assumes the table lock uses the Zookeeper. It's a workaround and only used + * in the sweep tool, and the sweep tool will be removed after the mob file compaction + * is finished. */ public void start() throws KeeperException { watcher.registerListener(this); - if (ZKUtil.watchAndCheckExists(watcher, node)) { - byte[] data = ZKUtil.getDataAndWatch(watcher, node); - if (data != null) { - if (!sweepJobId.equals(Bytes.toString(data))) { - System.exit(1); + List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode); + if (children != null && !children.isEmpty()) { + // there are locks + TreeSet<String> sortedChildren = new TreeSet<String>(); + sortedChildren.addAll(children); + // find all the write locks + SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix); + if (!tails.isEmpty()) { + for (String tail : tails) { + String path = ZKUtil.joinZNode(parentNode, tail); + byte[] data = ZKUtil.getDataAndWatch(watcher, path); + TableLock lock = TableLockManager.fromBytes(data); + ServerName serverName = lock.getLockOwner(); + org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf( + serverName.getHostName(), serverName.getPort(), serverName.getStartCode()); + // compare the server names (host, port and start code), make sure the lock is created + if (owner.equals(sn.toString())) { + lockNode = path; + return; + } } } - } else { - System.exit(1); } + System.exit(1); } @Override public void nodeDeleted(String path) { - // If the ephemeral node is deleted, abort the current process. - if (node.equals(path)) { + // If the lock node is deleted, abort the current process. + if (path.equals(lockNode)) { System.exit(1); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java index f508b93..56e5726 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; @@ -45,11 +45,12 @@ public class SweepMapper extends TableMapper<Text, KeyValue> { protected void setup(Context context) throws IOException, InterruptedException { String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); - String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE); + String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); zkw = new ZooKeeperWatcher(context.getConfiguration(), id, new DummyMobAbortable()); try { - SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id); + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); tracker.start(); } catch (KeeperException e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java index 9fd5750..73ca1a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java @@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFile; import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.DefaultMemStore; @@ -102,8 +102,8 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { protected void setup(Context context) throws IOException, InterruptedException { this.conf = context.getConfiguration(); this.fs = FileSystem.get(conf); - // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing. - mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY); + // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. + mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY); String tableName = conf.get(TableInputFormat.INPUT_TABLE); String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); TableName tn = TableName.valueOf(tableName); @@ -125,7 +125,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { } // disable the block cache. Configuration copyOfConf = new Configuration(conf); - copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); this.cacheConfig = new CacheConfig(copyOfConf); table = new HTable(this.conf, Bytes.toBytes(tableName)); @@ -148,12 +148,13 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { @Override public void run(Context context) throws IOException, InterruptedException { String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); - String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE); + String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); + String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId, new DummyMobAbortable()); FSDataOutputStream fout = null; try { - SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId); + SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); tracker.start(); setup(context); // create a sequence contains all the visited file names in this reducer. http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 218a4ef..3c8fa87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -46,17 +46,17 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFile; import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobStoreEngine; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.MobZookeeper; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileArchiveUtil; -import org.apache.zookeeper.KeeperException; /** * The store implementation to save MOBs (medium objects), it extends the HStore. @@ -91,6 +91,8 @@ public class HMobStore extends HStore { private volatile long mobScanCellsSize = 0; private List<Path> mobDirLocations; private HColumnDescriptor family; + private TableLockManager tableLockManager; + private TableName tableLockName; public HMobStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam) throws IOException { @@ -105,6 +107,10 @@ public class HMobStore extends HStore { TableName tn = region.getTableDesc().getTableName(); mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); + if (region.getRegionServerServices() != null) { + tableLockManager = region.getRegionServerServices().getTableLockManager(); + tableLockName = MobUtils.getTableLockName(getTableName()); + } } /** @@ -425,59 +431,40 @@ public class HMobStore extends HStore { // compaction as retainDeleteMarkers and continue the compaction. // 1.2.2. If the node is not there, add a child to the major compaction node, and // run the compaction directly. - String compactionName = UUID.randomUUID().toString().replaceAll("-", ""); - MobZookeeper zk = null; - try { - zk = MobZookeeper.newInstance(region.getBaseConf(), compactionName); - } catch (KeeperException e) { - LOG.error("Cannot connect to the zookeeper, forcing the delete markers to be retained", e); - compaction.getRequest().forceRetainDeleteMarkers(); - return super.compact(compaction); + TableLock lock = null; + if (tableLockManager != null) { + lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore"); + } + boolean tableLocked = false; + String tableName = getTableName().getNameAsString(); + if (lock != null) { + try { + LOG.info("Start to acquire a read lock for the table[" + tableName + + "], ready to perform the major compaction"); + lock.acquire(); + tableLocked = true; + } catch (Exception e) { + LOG.error("Fail to lock the table " + tableName, e); + } + } else { + // If the tableLockManager is null, mark the tableLocked as true. + tableLocked = true; } - boolean keepDeleteMarkers = true; - boolean majorCompactNodeAdded = false; try { - // try to acquire the operation lock. - if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) { - try { - LOG.info("Obtain the lock for the store[" + this - + "], ready to perform the major compaction"); - // check the sweeping node to find out whether the sweeping is in progress. - boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(), - getFamily().getNameAsString()); - if (!hasSweeper) { - // if not, add a child to the major compaction node of this store. - majorCompactNodeAdded = zk.addMajorCompactionZNode(getTableName().getNameAsString(), - getFamily().getNameAsString(), compactionName); - // If we failed to add the major compact node, go with keep delete markers mode. - keepDeleteMarkers = !majorCompactNodeAdded; - } - } catch (Exception e) { - LOG.error("Fail to handle the Zookeeper", e); - } finally { - // release the operation lock - zk.unlockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString()); - } + if (!tableLocked) { + LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table[" + + tableName + "], forcing the delete markers to be retained"); + compaction.getRequest().forceRetainDeleteMarkers(); } - try { - if (keepDeleteMarkers) { - LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" + this - + "], forcing the delete markers to be retained"); - compaction.getRequest().forceRetainDeleteMarkers(); - } - return super.compact(compaction); - } finally { - if (majorCompactNodeAdded) { - try { - zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily() - .getNameAsString(), compactionName); - } catch (KeeperException e) { - LOG.error("Fail to delete the compaction znode" + compactionName, e); - } + return super.compact(compaction); + } finally { + if (tableLocked && lock != null) { + try { + lock.release(); + } catch (IOException e) { + LOG.error("Fail to release the table lock " + tableName, e); } } - } finally { - zk.close(); } } else { // If it's not a major compaction, continue the compaction. http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java index a7e2538..2aa3a4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java @@ -17,15 +17,27 @@ */ package org.apache.hadoop.hbase.mob.mapreduce; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mob.MobZookeeper; -import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.junit.AfterClass; @@ -35,9 +47,6 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; - @Category(SmallTests.class) public class TestMobSweepMapper { @@ -71,30 +80,41 @@ public class TestMobSweepMapper { when(columns.raw()).thenReturn(kvList); Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); + TableName tn = TableName.valueOf("testSweepMapper"); + TableName lockName = MobUtils.getTableLockName(tn); + String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); configuration.set(SweepJob.SWEEP_JOB_ID, "1"); - configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepMapper:family-sweeper"); - - MobZookeeper zk = MobZookeeper.newInstance(configuration, "1"); - zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1")); - - Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx = - mock(Mapper.Context.class); - when(ctx.getConfiguration()).thenReturn(configuration); - SweepMapper map = new SweepMapper(); - doAnswer(new Answer<Void>() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Text text = (Text) invocation.getArguments()[0]; - KeyValue kv = (KeyValue) invocation.getArguments()[1]; - - assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName); - assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey())); - - return null; - } - }).when(ctx).write(any(Text.class), any(KeyValue.class)); - - map.map(r, columns, ctx); + configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); + ServerName serverName = SweepJob.getCurrentServerName(configuration); + configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); + + TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, + serverName); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + lock.acquire(); + try { + Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx = + mock(Mapper.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + SweepMapper map = new SweepMapper(); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Text text = (Text) invocation.getArguments()[0]; + KeyValue kv = (KeyValue) invocation.getArguments()[1]; + + assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName); + assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey())); + + return null; + } + }).when(ctx).write(any(Text.class), any(KeyValue.class)); + + map.map(r, columns, ctx); + } finally { + lock.release(); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java index a45ed34..1a69d06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java @@ -36,16 +36,21 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.MobZookeeper; +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -126,11 +131,11 @@ public class TestMobSweepReducer { @Test public void testRun() throws Exception { + TableName tn = TableName.valueOf(tableName); byte[] mobValueBytes = new byte[100]; //get the path where mob files lie in - Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), - TableName.valueOf(tableName), family); + Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family); Put put = new Put(Bytes.toBytes(row)); put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); @@ -139,7 +144,7 @@ public class TestMobSweepReducer { table.put(put); table.put(put2); table.flushCommits(); - admin.flush(TableName.valueOf(tableName)); + admin.flush(tn); FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); //check the generation of a mob file @@ -159,34 +164,42 @@ public class TestMobSweepReducer { configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, System.currentTimeMillis() + 24 * 3600 * 1000); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); + TableName lockName = MobUtils.getTableLockName(tn); + String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString()); configuration.set(SweepJob.SWEEP_JOB_ID, "1"); - configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepReducer:family-sweeper"); - - MobZookeeper zk = MobZookeeper.newInstance(configuration, "1"); - zk.addSweeperZNode(tableName, family, Bytes.toBytes("1")); - - //use the same counter when mocking - Counter counter = new GenericCounter(); - Reducer<Text, KeyValue, Writable, Writable>.Context ctx = - mock(Reducer.Context.class); - when(ctx.getConfiguration()).thenReturn(configuration); - when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); - when(ctx.nextKey()).thenReturn(true).thenReturn(false); - when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); - - byte[] refBytes = Bytes.toBytes(mobFile1); - long valueLength = refBytes.length; - byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); - KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), - Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue); - List<KeyValue> list = new ArrayList<KeyValue>(); - list.add(kv2); - - when(ctx.getValues()).thenReturn(list); - - SweepReducer reducer = new SweepReducer(); - reducer.run(ctx); - + configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); + ServerName serverName = SweepJob.getCurrentServerName(configuration); + configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); + + TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, + serverName); + TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); + lock.acquire(); + try { + // use the same counter when mocking + Counter counter = new GenericCounter(); + Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); + when(ctx.nextKey()).thenReturn(true).thenReturn(false); + when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); + + byte[] refBytes = Bytes.toBytes(mobFile1); + long valueLength = refBytes.length; + byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); + KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1, + KeyValue.Type.Put, newValue); + List<KeyValue> list = new ArrayList<KeyValue>(); + list.add(kv2); + + when(ctx.getValues()).thenReturn(list); + + SweepReducer reducer = new SweepReducer(); + reducer.run(ctx); + } finally { + lock.release(); + } FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); String mobFile2 = filsStatuses2[0].getPath().getName(); //new mob file is generated, old one has been archived @@ -194,7 +207,7 @@ public class TestMobSweepReducer { assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1)); //test sequence file - String workingPath = configuration.get("mob.compaction.visited.dir"); + String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY); FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath)); Set<String> files = new TreeSet<String>(); for (FileStatus st : statuses) { http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java index 2021bd8..c4817aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java @@ -180,7 +180,7 @@ public class TestMobSweeper { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000); + conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000); String[] args = new String[2]; args[0] = tableName; @@ -260,7 +260,7 @@ public class TestMobSweeper { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0); + conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0); String[] args = new String[2]; args[0] = tableName; http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java index fb85e87..2d68cd1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java @@ -224,62 +224,6 @@ public class TestMobCompaction { countMobCellsInMetadata()); } - /** - * Tests the major compaction when the zk is not connected. - * After that the major compaction will be marked as retainDeleteMarkers, the delete marks - * will be retained. - * @throws Exception - */ - @Test - public void testMajorCompactionWithZKError() throws Exception { - Configuration conf = new Configuration(UTIL.getConfiguration()); - // use the wrong zk settings - conf.setInt("zookeeper.recovery.retry", 0); - conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 100); - conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, - conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181) - 1); - init(conf, 200); - byte[] dummyData = makeDummyData(300); // larger than mob threshold - HRegionIncommon loader = new HRegionIncommon(region); - byte[] deleteRow = Bytes.toBytes(0); - for (int i = 0; i < compactionThreshold - 1 ; i++) { - Put p = new Put(Bytes.toBytes(i)); - p.setDurability(Durability.SKIP_WAL); - p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData); - loader.put(p); - loader.flushcache(); - } - Delete delete = new Delete(deleteRow); - delete.deleteFamily(COLUMN_FAMILY); - region.delete(delete); - loader.flushcache(); - - assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles()); - region.compactStores(true); - assertEquals("After compaction: store files", 1, countStoreFiles()); - - Scan scan = new Scan(); - scan.setRaw(true); - InternalScanner scanner = region.getScanner(scan); - List<Cell> results = new ArrayList<Cell>(); - scanner.next(results); - int deleteCount = 0; - while (!results.isEmpty()) { - for (Cell c : results) { - if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { - deleteCount++; - assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow)); - } - } - results.clear(); - scanner.next(results); - } - // assert the delete mark is retained, the major compaction is marked as - // retainDeleteMarkers. - assertEquals(1, deleteCount); - scanner.close(); - } - @Test public void testMajorCompactionAfterDelete() throws Exception { init(UTIL.getConfiguration(), 100);
