Author: jimk Date: Thu Dec 13 12:02:30 2007 New Revision: 604011 URL: http://svn.apache.org/viewvc?rev=604011&view=rev Log: HADOOP-2417 Fix critical shutdown problem introduced by HADOOP-2338
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=604011&r1=604010&r2=604011&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Dec 13 12:02:30 2007 @@ -77,7 +77,8 @@ HADOOP-2396 NPE in HMaster.cancelLease HADOOP-2397 The only time that a meta scanner should try to recover a log is when the master is starting - + HADOOP-2417 Fix critical shutdown problem introduced by HADOOP-2338 + IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable (Johan Oskarsson via Stack) Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=604011&r1=604010&r2=604011&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Thu Dec 13 12:02:30 2007 @@ -181,16 +181,14 @@ */ abstract class BaseScanner extends Chore { protected boolean rootRegion; - protected final Text tableName; protected abstract boolean initialScan(); protected abstract void maintenanceScan(); - BaseScanner(final Text tableName, final int period, + BaseScanner(final boolean rootRegion, final int period, final AtomicBoolean stop) { super(period, stop); - this.tableName = tableName; - this.rootRegion = tableName.equals(ROOT_TABLE_NAME); + this.rootRegion = rootRegion; } @Override @@ -506,7 +504,7 @@ class RootScanner extends BaseScanner { /** Constructor */ public RootScanner() { - super(HConstants.ROOT_TABLE_NAME, metaRescanInterval, closed); + super(true, metaRescanInterval, closed); } private boolean scanRoot() { @@ -671,7 +669,7 @@ /** Constructor */ public MetaScanner() { - super(HConstants.META_TABLE_NAME, metaRescanInterval, closed); + super(false, metaRescanInterval, closed); } private boolean scanOneMetaRegion(MetaRegion region) { @@ -1182,16 +1180,25 @@ * regions can shut down. */ private void stopScanners() { + if (LOG.isDebugEnabled()) { + LOG.debug("telling root scanner to stop"); + } synchronized(rootScannerLock) { if (rootScannerThread.isAlive()) { rootScannerThread.interrupt(); // Wake root scanner } } + if (LOG.isDebugEnabled()) { + LOG.debug("telling meta scanner to stop"); + } synchronized(metaScannerLock) { if (metaScannerThread.isAlive()) { metaScannerThread.interrupt(); // Wake meta scanner } } + if (LOG.isDebugEnabled()) { + LOG.debug("meta and root scanners notified"); + } } /* @@ -1341,18 +1348,23 @@ } } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { LOG.info("Region server " + serverName + " quiesced"); - if(quiescedMetaServers.incrementAndGet() == serversToServerInfo.size()) { - // If the only servers we know about are meta servers, then we can - // proceed with shutdown - LOG.info("All user tables quiesced. Proceeding with shutdown"); - closed.set(true); - stopScanners(); - synchronized(toDoQueue) { - toDoQueue.clear(); // Empty the queue - delayedToDoQueue.clear(); // Empty shut down queue - toDoQueue.notifyAll(); // Wake main thread - } - } + quiescedMetaServers.incrementAndGet(); + } + } + + if(quiescedMetaServers.get() >= serversToServerInfo.size()) { + // If the only servers we know about are meta servers, then we can + // proceed with shutdown + LOG.info("All user tables quiesced. Proceeding with shutdown"); + closed.set(true); + stopScanners(); + synchronized(toDoQueue) { + toDoQueue.clear(); // Empty the queue + delayedToDoQueue.clear(); // Empty shut down queue + toDoQueue.notifyAll(); // Wake main thread + } + synchronized (serversToServerInfo) { + serversToServerInfo.notifyAll(); } } @@ -1638,7 +1650,7 @@ " split. New regions are: " + newRegionA.getRegionName() + ", " + newRegionB.getRegionName()); - if (region.getTableDesc().getName().equals(META_TABLE_NAME)) { + if (region.isMetaTable()) { // A meta region has split. onlineMetaRegions.remove(region.getStartKey()); @@ -2028,7 +2040,7 @@ serverName + "> (or server is null). Marking unassigned if " + "meta and clearing pendingRegions"); - if (info.getTableDesc().getName().equals(META_TABLE_NAME)) { + if (info.isMetaTable()) { if (LOG.isDebugEnabled()) { LOG.debug("removing meta region " + info.getRegionName() + " from online meta regions"); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=604011&r1=604010&r2=604011&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Thu Dec 13 12:02:30 2007 @@ -225,6 +225,7 @@ protected final long threadWakeFrequency; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); + private final Integer splitLock = new Integer(0); private final long desiredMaxFileSize; private final long minSequenceId; private final String encodedRegionName; @@ -381,54 +382,56 @@ LOG.info("region " + this.regionInfo.getRegionName() + " already closed"); return null; } - lock.writeLock().lock(); - try { - synchronized (writestate) { - while (writestate.compacting || writestate.flushing) { - try { - writestate.wait(); - } catch (InterruptedException iex) { - // continue + synchronized (splitLock) { + lock.writeLock().lock(); + try { + synchronized (writestate) { + while (writestate.compacting || writestate.flushing) { + try { + writestate.wait(); + } catch (InterruptedException iex) { + // continue + } } + // Disable compacting and flushing by background threads for this + // region. + writestate.writesEnabled = false; } - // Disable compacting and flushing by background threads for this - // region. - writestate.writesEnabled = false; - } - - // Wait for active scanners to finish. The write lock we hold will prevent - // new scanners from being created. - - synchronized (activeScannerCount) { - while (activeScannerCount.get() != 0) { - try { - activeScannerCount.wait(); - - } catch (InterruptedException e) { - // continue + + // Wait for active scanners to finish. The write lock we hold will prevent + // new scanners from being created. + + synchronized (activeScannerCount) { + while (activeScannerCount.get() != 0) { + try { + activeScannerCount.wait(); + + } catch (InterruptedException e) { + // continue + } } } + + // Write lock means no more row locks can be given out. Wait on + // outstanding row locks to come in before we close so we do not drop + // outstanding updates. + waitOnRowLocks(); + + // Don't flush the cache if we are aborting + if (!abort) { + internalFlushcache(snapshotMemcaches()); + } + + List<HStoreFile> result = new ArrayList<HStoreFile>(); + for (HStore store: stores.values()) { + result.addAll(store.close()); + } + this.closed.set(true); + LOG.info("closed " + this.regionInfo.getRegionName()); + return result; + } finally { + lock.writeLock().unlock(); } - - // Write lock means no more row locks can be given out. Wait on - // outstanding row locks to come in before we close so we do not drop - // outstanding updates. - waitOnRowLocks(); - - // Don't flush the cache if we are aborting - if (!abort) { - internalFlushcache(snapshotMemcaches()); - } - - List<HStoreFile> result = new ArrayList<HStoreFile>(); - for (HStore store: stores.values()) { - result.addAll(store.close()); - } - this.closed.set(true); - LOG.info("closed " + this.regionInfo.getRegionName()); - return result; - } finally { - lock.writeLock().unlock(); } } @@ -541,89 +544,91 @@ HRegion[] splitRegion(final RegionUnavailableListener listener) throws IOException { - Text midKey = new Text(); - if (!needsSplit(midKey)) { - return null; - } - long startTime = System.currentTimeMillis(); - Path splits = getSplitsDir(); - HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), - this.regionInfo.getStartKey(), midKey); - Path dirA = getSplitRegionDir(splits, - HRegionInfo.encodeRegionName(regionAInfo.getRegionName())); - if(fs.exists(dirA)) { - throw new IOException("Cannot split; target file collision at " + dirA); - } - HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(), - midKey, null); - Path dirB = getSplitRegionDir(splits, - HRegionInfo.encodeRegionName(regionBInfo.getRegionName())); - if(this.fs.exists(dirB)) { - throw new IOException("Cannot split; target file collision at " + dirB); - } - - // Notify the caller that we are about to close the region. This moves - // us to the 'retiring' queue. Means no more updates coming in -- just - // whatever is outstanding. - if (listener != null) { - listener.closing(getRegionName()); - } - - // Now close the HRegion. Close returns all store files or null if not - // supposed to close (? What to do in this case? Implement abort of close?) - // Close also does wait on outstanding rows and calls a flush just-in-case. - List<HStoreFile> hstoreFilesToSplit = close(); - if (hstoreFilesToSplit == null) { - LOG.warn("Close came back null (Implement abort of close?)"); - throw new RuntimeException("close returned empty vector of HStoreFiles"); - } - - // Tell listener that region is now closed and that they can therefore - // clean up any outstanding references. - if (listener != null) { - listener.closed(this.getRegionName()); - } - - // Split each store file. - for(HStoreFile h: hstoreFilesToSplit) { - // A reference to the bottom half of the hsf store file. - HStoreFile.Reference aReference = new HStoreFile.Reference( - this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), - HStoreFile.Range.bottom); - HStoreFile a = new HStoreFile(this.conf, splits, - HRegionInfo.encodeRegionName(regionAInfo.getRegionName()), - h.getColFamily(), Math.abs(rand.nextLong()), aReference); - // Reference to top half of the hsf store file. - HStoreFile.Reference bReference = new HStoreFile.Reference( - this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), - HStoreFile.Range.top); - HStoreFile b = new HStoreFile(this.conf, splits, - HRegionInfo.encodeRegionName(regionBInfo.getRegionName()), - h.getColFamily(), Math.abs(rand.nextLong()), bReference); - h.splitStoreFile(a, b, this.fs); - } - - // Done! - // Opening the region copies the splits files from the splits directory - // under each region. - HRegion regionA = - new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null); - regionA.close(); - HRegion regionB = - new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null); - regionB.close(); + synchronized (splitLock) { + Text midKey = new Text(); + if (closed.get() || !needsSplit(midKey)) { + return null; + } + long startTime = System.currentTimeMillis(); + Path splits = getSplitsDir(); + HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), + this.regionInfo.getStartKey(), midKey); + Path dirA = getSplitRegionDir(splits, + HRegionInfo.encodeRegionName(regionAInfo.getRegionName())); + if(fs.exists(dirA)) { + throw new IOException("Cannot split; target file collision at " + dirA); + } + HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(), + midKey, null); + Path dirB = getSplitRegionDir(splits, + HRegionInfo.encodeRegionName(regionBInfo.getRegionName())); + if(this.fs.exists(dirB)) { + throw new IOException("Cannot split; target file collision at " + dirB); + } + + // Notify the caller that we are about to close the region. This moves + // us to the 'retiring' queue. Means no more updates coming in -- just + // whatever is outstanding. + if (listener != null) { + listener.closing(getRegionName()); + } + + // Now close the HRegion. Close returns all store files or null if not + // supposed to close (? What to do in this case? Implement abort of close?) + // Close also does wait on outstanding rows and calls a flush just-in-case. + List<HStoreFile> hstoreFilesToSplit = close(); + if (hstoreFilesToSplit == null) { + LOG.warn("Close came back null (Implement abort of close?)"); + throw new RuntimeException("close returned empty vector of HStoreFiles"); + } + + // Tell listener that region is now closed and that they can therefore + // clean up any outstanding references. + if (listener != null) { + listener.closed(this.getRegionName()); + } + + // Split each store file. + for(HStoreFile h: hstoreFilesToSplit) { + // A reference to the bottom half of the hsf store file. + HStoreFile.Reference aReference = new HStoreFile.Reference( + this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), + HStoreFile.Range.bottom); + HStoreFile a = new HStoreFile(this.conf, splits, + HRegionInfo.encodeRegionName(regionAInfo.getRegionName()), + h.getColFamily(), Math.abs(rand.nextLong()), aReference); + // Reference to top half of the hsf store file. + HStoreFile.Reference bReference = new HStoreFile.Reference( + this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), + HStoreFile.Range.top); + HStoreFile b = new HStoreFile(this.conf, splits, + HRegionInfo.encodeRegionName(regionBInfo.getRegionName()), + h.getColFamily(), Math.abs(rand.nextLong()), bReference); + h.splitStoreFile(a, b, this.fs); + } + + // Done! + // Opening the region copies the splits files from the splits directory + // under each region. + HRegion regionA = + new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null); + regionA.close(); + HRegion regionB = + new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null); + regionB.close(); - // Cleanup - boolean deleted = fs.delete(splits); // Get rid of splits directory - if (LOG.isDebugEnabled()) { - LOG.debug("Cleaned up " + splits.toString() + " " + deleted); + // Cleanup + boolean deleted = fs.delete(splits); // Get rid of splits directory + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaned up " + splits.toString() + " " + deleted); + } + HRegion regions[] = new HRegion [] {regionA, regionB}; + LOG.info("Region split of " + this.regionInfo.getRegionName() + + " complete; " + "new regions: " + regions[0].getRegionName() + ", " + + regions[1].getRegionName() + ". Split took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + return regions; } - HRegion regions[] = new HRegion [] {regionA, regionB}; - LOG.info("Region split of " + this.regionInfo.getRegionName() + - " complete; " + "new regions: " + regions[0].getRegionName() + ", " + - regions[1].getRegionName() + ". Split took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return regions; } /* @@ -1030,6 +1035,7 @@ * avoid a bunch of disk activity. * * @param row + * @param ts * @return Map<columnName, byte[]> values * @throws IOException */ @@ -1282,6 +1288,7 @@ * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @throws IOException */ public void deleteFamily(Text row, Text family, long timestamp) throws IOException{ Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=604011&r1=604010&r2=604011&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Thu Dec 13 12:02:30 2007 @@ -79,9 +79,9 @@ // of HRegionServer in isolation. We use AtomicBoolean rather than // plain boolean so we can pass a reference to Chore threads. Otherwise, // Chore threads need to know about the hosting class. - protected final AtomicBoolean stopRequested = new AtomicBoolean(false); + protected volatile AtomicBoolean stopRequested = new AtomicBoolean(false); - protected final AtomicBoolean quiesced = new AtomicBoolean(false); + protected volatile AtomicBoolean quiesced = new AtomicBoolean(false); // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. @@ -95,13 +95,13 @@ private final Random rand = new Random(); // region name -> HRegion - protected final SortedMap<Text, HRegion> onlineRegions = + protected volatile SortedMap<Text, HRegion> onlineRegions = Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>()); - protected final Map<Text, HRegion> retiringRegions = + protected volatile Map<Text, HRegion> retiringRegions = new ConcurrentHashMap<Text, HRegion>(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List<HMsg> outboundMsgs = + private volatile List<HMsg> outboundMsgs = Collections.synchronizedList(new ArrayList<HMsg>()); final int numRetries; @@ -120,7 +120,7 @@ private final Leases leases; // Request counter - private final AtomicInteger requestCount = new AtomicInteger(); + private volatile AtomicInteger requestCount = new AtomicInteger(); // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; @@ -296,7 +296,7 @@ // splitting a 'normal' region, and the ROOT table needs to be // updated if we are splitting a META region. HTable t = null; - if (region.getRegionInfo().getTableDesc().getName().equals(META_TABLE_NAME)) { + if (region.getRegionInfo().isMetaTable()) { // We need to update the root region if (this.root == null) { this.root = new HTable(conf, ROOT_TABLE_NAME);