Author: cutting Date: Fri May 11 14:14:23 2007 New Revision: 537293 URL: http://svn.apache.org/viewvc?view=rev&rev=537293 Log: Merge -r 537291:537292 from trunk to 0.13 branch. Fixes: HADOOP-1341.
Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Fri May 11 14:14:23 2007 @@ -369,6 +369,9 @@ underlying FileSystem, correctly aborting files being written. (Hairong Kuang via cutting) +110. HADOOP-1341. Fix intermittent failures in HBase unit tests + caused by deadlock. (Jim Kellerman via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Fri May 11 14:14:23 2007 @@ -104,24 +104,61 @@ /** * Check client is open. */ - private synchronized void checkOpen() { + private void checkOpen() { if (this.closed) { throw new IllegalStateException("client is not open"); } } - private synchronized void checkMaster() throws IOException { + /** + * Find the address of the master and connect to it + */ + private void checkMaster() { if (this.master != null) { return; } - HServerAddress masterLocation = - new HServerAddress(this.conf.get(MASTER_ADDRESS)); - this.master = (HMasterInterface)RPC.getProxy(HMasterInterface.class, - HMasterInterface.versionID, masterLocation.getInetSocketAddress(), this.conf); + for(int tries = 0; this.master == null && tries < numRetries; tries++) { + HServerAddress masterLocation = + new HServerAddress(this.conf.get(MASTER_ADDRESS)); + + try { + HMasterInterface tryMaster = + (HMasterInterface)RPC.getProxy(HMasterInterface.class, + HMasterInterface.versionID, masterLocation.getInetSocketAddress(), + this.conf); + + if(tryMaster.isMasterRunning()) { + this.master = tryMaster; + break; + } + } catch(IOException e) { + if(tries == numRetries - 1) { + // This was our last chance - don't bother sleeping + break; + } + } + + // We either cannot connect to the master or it is not running. + // Sleep and retry + + try { + Thread.sleep(this.clientTimeout); + + } catch(InterruptedException e) { + } + } + if(this.master == null) { + throw new IllegalStateException("Master is not running"); + } } - public synchronized void createTable(HTableDescriptor desc) - throws IOException { + public synchronized void createTable(HTableDescriptor desc) throws IOException { + if(desc.getName().equals(ROOT_TABLE_NAME) + || desc.getName().equals(META_TABLE_NAME)) { + + throw new IllegalArgumentException(desc.getName().toString() + + " is a reserved table name"); + } checkOpen(); checkMaster(); locateRootRegion(); Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Fri May 11 14:14:23 2007 @@ -0,0 +1,101 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/******************************************************************************* + * HLocking is a set of lock primitives that does not rely on a + * particular thread holding the monitor for an object. This is + * especially important when a lock must persist over multiple RPC's + * since there is no guarantee that the same Server thread will handle + * all the RPC's until the lock is released. + * + * For each independent entity that needs locking, create a new + * HLocking instance. + * + ******************************************************************************/ +public class HLocking { + private Integer mutex; + + // If lockers == 0, the lock is unlocked + // If lockers > 0, locked for read + // If lockers == -1 locked for write + + private int lockers; + + /** Constructor */ + public HLocking() { + this.mutex = new Integer(0); + this.lockers = 0; + } + + /** + * Caller needs the nonexclusive read-lock + */ + public void obtainReadLock() { + synchronized(mutex) { + while(lockers < 0) { + try { + mutex.wait(); + } catch(InterruptedException ie) { + } + } + lockers++; + mutex.notifyAll(); + } + } + + /** + * Caller is finished with the nonexclusive read-lock + */ + public void releaseReadLock() { + synchronized(mutex) { + lockers--; + if(lockers < 0) { + throw new IllegalStateException("lockers: " + lockers); + } + mutex.notifyAll(); + } + } + + /** + * Caller needs the exclusive write-lock + */ + public void obtainWriteLock() { + synchronized(mutex) { + while(lockers != 0) { + try { + mutex.wait(); + } catch (InterruptedException ie) { + } + } + lockers = -1; + mutex.notifyAll(); + } + } + + /** + * Caller is finished with the write lock + */ + public void releaseWriteLock() { + synchronized(mutex) { + if(lockers != -1) { + throw new IllegalStateException("lockers: " + lockers); + } + lockers = 0; + mutex.notifyAll(); + } + } +} Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri May 11 14:14:23 2007 @@ -258,16 +258,18 @@ if (waitForRootRegionOrClose()) { continue; } - rootScanned = false; - // Make a MetaRegion instance for ROOT region to pass scanRegion. - MetaRegion mr = new MetaRegion(); - mr.regionName = HGlobals.rootRegionInfo.regionName; - mr.server = HMaster.this.rootRegionLocation; - mr.startKey = null; - if (scanRegion(mr)) { - numMetaRegions += 1; + synchronized(rootScannerLock) { // Don't interrupt us while we're working + rootScanned = false; + // Make a MetaRegion instance for ROOT region to pass scanRegion. + MetaRegion mr = new MetaRegion(); + mr.regionName = HGlobals.rootRegionInfo.regionName; + mr.server = HMaster.this.rootRegionLocation; + mr.startKey = null; + if (scanRegion(mr)) { + numMetaRegions += 1; + } + rootScanned = true; } - rootScanned = true; try { if (LOG.isDebugEnabled()) { LOG.debug("RootScanner going to sleep"); @@ -291,6 +293,7 @@ private RootScanner rootScanner; private Thread rootScannerThread; + private Integer rootScannerLock = new Integer(0); private class MetaRegion { public HServerAddress server; @@ -347,14 +350,16 @@ continue; } try { - scanRegion(region); - knownMetaRegions.put(region.startKey, region); - if (rootScanned && knownMetaRegions.size() == numMetaRegions) { - if(LOG.isDebugEnabled()) { - LOG.debug("all meta regions scanned"); + synchronized(metaScannerLock) { // Don't interrupt us while we're working + scanRegion(region); + knownMetaRegions.put(region.startKey, region); + if (rootScanned && knownMetaRegions.size() == numMetaRegions) { + if(LOG.isDebugEnabled()) { + LOG.debug("all meta regions scanned"); + } + allMetaRegionsScanned = true; + metaRegionsScanned(); } - allMetaRegionsScanned = true; - metaRegionsScanned(); } do { @@ -375,10 +380,13 @@ } // Rescan the known meta regions every so often - Vector<MetaRegion> v = new Vector<MetaRegion>(); - v.addAll(knownMetaRegions.values()); - for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) { - scanRegion(i.next()); + + synchronized(metaScannerLock) { // Don't interrupt us while we're working + Vector<MetaRegion> v = new Vector<MetaRegion>(); + v.addAll(knownMetaRegions.values()); + for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) { + scanRegion(i.next()); + } } } while(true); @@ -417,6 +425,7 @@ private MetaScanner metaScanner; private Thread metaScannerThread; + private Integer metaScannerLock = new Integer(0); // The 'unassignedRegions' table maps from a region name to a HRegionInfo record, // which includes the region's table, its id, and its start/end keys. @@ -598,9 +607,13 @@ */ // Wake other threads so they notice the close - - rootScannerThread.interrupt(); - metaScannerThread.interrupt(); + + synchronized(rootScannerLock) { + rootScannerThread.interrupt(); + } + synchronized(metaScannerLock) { + metaScannerThread.interrupt(); + } server.stop(); // Stop server serverLeases.close(); // Turn off the lease monitor try { Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java Fri May 11 14:14:23 2007 @@ -29,6 +29,12 @@ public static final long versionID = 1L; // initial version ////////////////////////////////////////////////////////////////////////////// + // Check to see if master is available + ////////////////////////////////////////////////////////////////////////////// + + public boolean isMasterRunning(); + + ////////////////////////////////////////////////////////////////////////////// // Admin tools would use these cmds ////////////////////////////////////////////////////////////////////////////// Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Fri May 11 14:14:23 2007 @@ -22,8 +22,6 @@ import java.io.*; import java.util.*; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * The HMemcache holds in-memory modifications to the HRegion. This is really a @@ -40,7 +38,7 @@ TreeMap<HStoreKey, BytesWritable> snapshot = null; - ReadWriteLock locker = new ReentrantReadWriteLock(); + HLocking lock = new HLocking(); public HMemcache() { } @@ -69,7 +67,7 @@ public Snapshot snapshotMemcacheForLog(HLog log) throws IOException { Snapshot retval = new Snapshot(); - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { if(snapshot != null) { throw new IOException("Snapshot in progress!"); @@ -98,7 +96,7 @@ return retval; } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -108,7 +106,7 @@ * Modifying the structure means we need to obtain a writelock. */ public void deleteSnapshot() throws IOException { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { if(snapshot == null) { @@ -134,7 +132,7 @@ } } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -144,7 +142,7 @@ * Operation uses a write lock. */ public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { for(Iterator<Text> it = columns.keySet().iterator(); it.hasNext(); ) { Text column = it.next(); @@ -155,7 +153,7 @@ } } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -166,7 +164,7 @@ */ public BytesWritable[] get(HStoreKey key, int numVersions) { Vector<BytesWritable> results = new Vector<BytesWritable>(); - this.locker.readLock().lock(); + this.lock.obtainReadLock(); try { Vector<BytesWritable> result = get(memcache, key, numVersions-results.size()); results.addAll(0, result); @@ -188,7 +186,7 @@ } } finally { - this.locker.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -200,7 +198,7 @@ */ public TreeMap<Text, BytesWritable> getFull(HStoreKey key) throws IOException { TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>(); - this.locker.readLock().lock(); + this.lock.obtainReadLock(); try { internalGetFull(memcache, key, results); for(int i = history.size()-1; i >= 0; i--) { @@ -210,7 +208,7 @@ return results; } finally { - this.locker.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -287,7 +285,7 @@ super(timestamp, targetCols); - locker.readLock().lock(); + lock.obtainReadLock(); try { this.backingMaps = new TreeMap[history.size() + 1]; @@ -377,7 +375,7 @@ } } finally { - locker.readLock().unlock(); + lock.releaseReadLock(); scannerClosed = true; } } Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Fri May 11 14:14:23 2007 @@ -270,6 +270,7 @@ int commitsSinceFlush = 0; int maxUnflushedEntries = 0; + int compactionThreshold = 0; ////////////////////////////////////////////////////////////////////////////// // Constructor @@ -336,7 +337,13 @@ fs.delete(merges); } + // By default, we flush the cache after 10,000 commits + this.maxUnflushedEntries = conf.getInt("hbase.hregion.maxunflushed", 10000); + + // By default, we compact the region if an HStore has more than 10 map files + + this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold", 10); // HRegion is ready to go! this.writestate.writesOngoing = false; @@ -568,10 +575,8 @@ * * @param midKey - (return value) midKey of the largest MapFile * @return - true if the region should be split - * - * @throws IOException */ - public boolean needsSplit(Text midKey) throws IOException { + public boolean needsSplit(Text midKey) { Text key = new Text(); long maxSize = 0; @@ -588,6 +593,20 @@ } /** + * @return true if the region should be compacted. + */ + public boolean needsCompaction() { + boolean needsCompaction = false; + for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) { + if(i.next().getNMaps() > compactionThreshold) { + needsCompaction = true; + break; + } + } + return needsCompaction; + } + + /** * Compact all the stores. This should be called periodically to make sure * the stores are kept manageable. * @@ -643,6 +662,9 @@ */ public void optionallyFlush() throws IOException { if(commitsSinceFlush > maxUnflushedEntries) { + if (LOG.isDebugEnabled()) { + LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush); + } flushcache(false); } } Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri May 11 14:14:23 2007 @@ -25,8 +25,6 @@ import java.io.*; import java.util.*; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -52,7 +50,7 @@ private Configuration conf; private Random rand; private TreeMap<Text, HRegion> regions; // region name -> HRegion - private ReadWriteLock locker; + private HLocking lock; private Vector<HMsg> outboundMsgs; private long threadWakeFrequency; @@ -61,11 +59,12 @@ // Check to see if regions should be split - private long splitCheckFrequency; - private SplitChecker splitChecker; - private Thread splitCheckerThread; + private long splitOrCompactCheckFrequency; + private SplitOrCompactChecker splitOrCompactChecker; + private Thread splitOrCompactCheckerThread; + private Integer splitOrCompactLock = new Integer(0); - private class SplitChecker implements Runnable { + private class SplitOrCompactChecker implements Runnable { private HClient client = new HClient(conf); private class SplitRegion { @@ -82,116 +81,122 @@ while(! stopRequested) { long startTime = System.currentTimeMillis(); - // Grab a list of regions to check - Vector<HRegion> checkSplit = new Vector<HRegion>(); - locker.readLock().lock(); - try { - checkSplit.addAll(regions.values()); - } finally { - locker.readLock().unlock(); - } + synchronized(splitOrCompactLock) { - // Check to see if they need splitting + // Grab a list of regions to check - Vector<SplitRegion> toSplit = new Vector<SplitRegion>(); - for(Iterator<HRegion> it = checkSplit.iterator(); it.hasNext(); ) { - HRegion cur = it.next(); - Text midKey = new Text(); - + Vector<HRegion> regionsToCheck = new Vector<HRegion>(); + lock.obtainReadLock(); try { - if(cur.needsSplit(midKey)) { + regionsToCheck.addAll(regions.values()); + } finally { + lock.releaseReadLock(); + } + + // Check to see if they need splitting or compacting + + Vector<SplitRegion> toSplit = new Vector<SplitRegion>(); + Vector<HRegion> toCompact = new Vector<HRegion>(); + for(Iterator<HRegion> it = regionsToCheck.iterator(); it.hasNext(); ) { + HRegion cur = it.next(); + Text midKey = new Text(); + + if(cur.needsCompaction()) { + toCompact.add(cur); + + } else if(cur.needsSplit(midKey)) { toSplit.add(new SplitRegion(cur, midKey)); } - - } catch(IOException iex) { - iex.printStackTrace(); } - } - for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) { - SplitRegion r = it.next(); - - locker.writeLock().lock(); - regions.remove(r.region.getRegionName()); - locker.writeLock().unlock(); - - HRegion[] newRegions = null; try { - Text oldRegion = r.region.getRegionName(); - - LOG.info("splitting region: " + oldRegion); - - newRegions = r.region.closeAndSplit(r.midKey); - - // When a region is split, the META table needs to updated if we're - // splitting a 'normal' region, and the ROOT table needs to be - // updated if we are splitting a META region. - - Text tableToUpdate - = (oldRegion.find(META_TABLE_NAME.toString()) == 0) - ? ROOT_TABLE_NAME : META_TABLE_NAME; - - if(LOG.isDebugEnabled()) { - LOG.debug("region split complete. updating meta"); - } - - client.openTable(tableToUpdate); - long lockid = client.startUpdate(oldRegion); - client.delete(lockid, COL_REGIONINFO); - client.delete(lockid, COL_SERVER); - client.delete(lockid, COL_STARTCODE); - client.commit(lockid); - - for(int i = 0; i < newRegions.length; i++) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bytes); - newRegions[i].getRegionInfo().write(out); - - lockid = client.startUpdate(newRegions[i].getRegionName()); - client.put(lockid, COL_REGIONINFO, bytes.toByteArray()); + for(Iterator<HRegion>it = toCompact.iterator(); it.hasNext(); ) { + it.next().compactStores(); + } + + for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) { + SplitRegion r = it.next(); + + lock.obtainWriteLock(); + regions.remove(r.region.getRegionName()); + lock.releaseWriteLock(); + + HRegion[] newRegions = null; + Text oldRegion = r.region.getRegionName(); + + LOG.info("splitting region: " + oldRegion); + + newRegions = r.region.closeAndSplit(r.midKey); + + // When a region is split, the META table needs to updated if we're + // splitting a 'normal' region, and the ROOT table needs to be + // updated if we are splitting a META region. + + Text tableToUpdate = + (oldRegion.find(META_TABLE_NAME.toString()) == 0) ? + ROOT_TABLE_NAME : META_TABLE_NAME; + + if(LOG.isDebugEnabled()) { + LOG.debug("region split complete. updating meta"); + } + + client.openTable(tableToUpdate); + long lockid = client.startUpdate(oldRegion); + client.delete(lockid, COL_REGIONINFO); + client.delete(lockid, COL_SERVER); + client.delete(lockid, COL_STARTCODE); client.commit(lockid); + + for(int i = 0; i < newRegions.length; i++) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bytes); + newRegions[i].getRegionInfo().write(out); + + lockid = client.startUpdate(newRegions[i].getRegionName()); + client.put(lockid, COL_REGIONINFO, bytes.toByteArray()); + client.commit(lockid); + } + + // Now tell the master about the new regions + + if(LOG.isDebugEnabled()) { + LOG.debug("reporting region split to master"); + } + + reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo()); + + LOG.info("region split successful. old region=" + oldRegion + + ", new regions: " + newRegions[0].getRegionName() + ", " + + newRegions[1].getRegionName()); + + newRegions[0].close(); + newRegions[1].close(); + } - - // Now tell the master about the new regions - - if(LOG.isDebugEnabled()) { - LOG.debug("reporting region split to master"); - } - - reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo()); - - LOG.info("region split successful. old region=" + oldRegion - + ", new regions: " + newRegions[0].getRegionName() + ", " - + newRegions[1].getRegionName()); - - newRegions[0].close(); - newRegions[1].close(); - } catch(IOException e) { //TODO: What happens if this fails? Are we toast? - e.printStackTrace(); - continue; + LOG.error(e); } } // Sleep long waitTime = stopRequested ? 0 - : splitCheckFrequency - (System.currentTimeMillis() - startTime); + : splitOrCompactCheckFrequency - (System.currentTimeMillis() - startTime); if (waitTime > 0) { try { if (LOG.isDebugEnabled()) { - LOG.debug("Sleep splitChecker"); + LOG.debug("Sleep splitOrCompactChecker"); } Thread.sleep(waitTime); if (LOG.isDebugEnabled()) { - LOG.debug("Wake splitChecker"); + LOG.debug("Wake splitOrCompactChecker"); } } catch(InterruptedException iex) { } } } if(LOG.isDebugEnabled()) { - LOG.debug("splitChecker exiting"); + LOG.debug("splitOrCompactChecker exiting"); } } } @@ -200,35 +205,39 @@ private Flusher cacheFlusher; private Thread cacheFlusherThread; + private Integer cacheFlusherLock = new Integer(0); private class Flusher implements Runnable { public void run() { while(! stopRequested) { long startTime = System.currentTimeMillis(); - // Grab a list of items to flush - - Vector<HRegion> toFlush = new Vector<HRegion>(); - locker.readLock().lock(); - try { - toFlush.addAll(regions.values()); - - } finally { - locker.readLock().unlock(); - } + synchronized(cacheFlusherLock) { - // Flush them, if necessary + // Grab a list of items to flush - for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) { - HRegion cur = it.next(); - + Vector<HRegion> toFlush = new Vector<HRegion>(); + lock.obtainReadLock(); try { - cur.optionallyFlush(); - - } catch(IOException iex) { - iex.printStackTrace(); + toFlush.addAll(regions.values()); + + } finally { + lock.releaseReadLock(); } - } + // Flush them, if necessary + + for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) { + HRegion cur = it.next(); + + try { + cur.optionallyFlush(); + + } catch(IOException iex) { + iex.printStackTrace(); + } + } + } + // Sleep long waitTime = stopRequested ? 0 : threadWakeFrequency - (System.currentTimeMillis() - startTime); @@ -262,18 +271,25 @@ private HLog log; private LogRoller logRoller; private Thread logRollerThread; + private Integer logRollerLock = new Integer(0); private class LogRoller implements Runnable { public void run() { while(! stopRequested) { - // If the number of log entries is high enough, roll the log. This is a - // very fast operation, but should not be done too frequently. - if(log.getNumEntries() > maxLogEntries) { - try { - log.rollWriter(); - } catch(IOException iex) { + synchronized(logRollerLock) { + // If the number of log entries is high enough, roll the log. This is a + // very fast operation, but should not be done too frequently. + int nEntries = log.getNumEntries(); + if(nEntries > maxLogEntries) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Rolling log. Number of entries is: " + nEntries); + } + log.rollWriter(); + } catch(IOException iex) { + } } } - + if(!stopRequested) { if (LOG.isDebugEnabled()) { LOG.debug("Sleep logRoller"); @@ -323,7 +339,7 @@ this.conf = conf; this.rand = new Random(); this.regions = new TreeMap<Text, HRegion>(); - this.locker = new ReentrantReadWriteLock(); + this.lock = new HLocking(); this.outboundMsgs = new Vector<HMsg>(); this.scanners = Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>()); @@ -333,8 +349,8 @@ this.maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); this.msgInterval = conf.getLong("hbase.regionserver.msginterval", 15 * 1000); - this.splitCheckFrequency = - conf.getLong("hbase.regionserver.thread.splitcheckfrequency", 60 * 1000); + this.splitOrCompactCheckFrequency = + conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 60 * 1000); // Cache flushing this.cacheFlusher = new Flusher(); @@ -342,9 +358,9 @@ new Thread(cacheFlusher, "HRegionServer.cacheFlusher"); // Check regions to see if they need to be split - this.splitChecker = new SplitChecker(); - this.splitCheckerThread = - new Thread(splitChecker, "HRegionServer.splitChecker"); + this.splitOrCompactChecker = new SplitOrCompactChecker(); + this.splitOrCompactCheckerThread = + new Thread(splitOrCompactChecker, "HRegionServer.splitOrCompactChecker"); // Process requests from Master this.toDo = new Vector<HMsg>(); @@ -386,7 +402,7 @@ // Threads this.workerThread.start(); this.cacheFlusherThread.start(); - this.splitCheckerThread.start(); + this.splitOrCompactCheckerThread.start(); this.logRollerThread.start(); this.leases = new Leases(conf.getLong("hbase.regionserver.lease.period", 3 * 60 * 1000), threadWakeFrequency); @@ -429,14 +445,14 @@ } catch(InterruptedException iex) { } try { - this.splitCheckerThread.join(); + this.splitOrCompactCheckerThread.join(); } catch(InterruptedException iex) { } try { this.server.join(); } catch(InterruptedException iex) { } - LOG.info("server stopped at: " + address.toString()); + LOG.info("HRegionServer stopped at: " + address.toString()); } /** @@ -568,9 +584,17 @@ // Send interrupts to wake up threads if sleeping so they notice shutdown. - this.logRollerThread.interrupt(); - this.cacheFlusherThread.interrupt(); - this.splitCheckerThread.interrupt(); + synchronized(logRollerLock) { + this.logRollerThread.interrupt(); + } + + synchronized(cacheFlusherLock) { + this.cacheFlusherThread.interrupt(); + } + + synchronized(splitOrCompactLock) { + this.splitOrCompactCheckerThread.interrupt(); + } this.worker.stop(); this.server.stop(); @@ -721,7 +745,7 @@ } private void openRegion(HRegionInfo regionInfo) throws IOException { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); @@ -729,14 +753,14 @@ reportOpen(region); } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } private void closeRegion(HRegionInfo info, boolean reportWhenCompleted) throws IOException { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { HRegion region = regions.remove(info.regionName); @@ -749,13 +773,13 @@ } } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } private void closeAndDeleteRegion(HRegionInfo info) throws IOException { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { HRegion region = regions.remove(info.regionName); @@ -764,13 +788,13 @@ } } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } /** Called either when the master tells us to restart or from stop() */ private void closeAllRegions() { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext(); ) { HRegion region = it.next(); @@ -787,7 +811,7 @@ regions.clear(); } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -1005,12 +1029,12 @@ /** Private utility method for safely obtaining an HRegion handle. */ private HRegion getRegion(Text regionName) { - this.locker.readLock().lock(); + this.lock.obtainReadLock(); try { return regions.get(regionName); } finally { - this.locker.readLock().unlock(); + this.lock.releaseReadLock(); } } Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Fri May 11 14:14:23 2007 @@ -23,8 +23,6 @@ import java.util.Random; import java.util.TreeMap; import java.util.Vector; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,7 +63,7 @@ Integer compactLock = new Integer(0); Integer flushLock = new Integer(0); - ReadWriteLock locker = new ReentrantReadWriteLock(); + HLocking lock = new HLocking(); TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>(); TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>(); @@ -240,7 +238,7 @@ /** Turn off all the MapFile readers */ public void close() throws IOException { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily); try { @@ -254,7 +252,7 @@ LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily); } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -326,7 +324,7 @@ // C. Finally, make the new MapFile available. if(addToAvailableMaps) { - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf)); @@ -337,20 +335,15 @@ } } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } return getAllMapFiles(); } } - public Vector<HStoreFile> getAllMapFiles() { - Vector<HStoreFile> flushedFiles = new Vector<HStoreFile>(); - for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) { - HStoreFile hsf = it.next(); - flushedFiles.add(hsf); - } - return flushedFiles; + public synchronized Vector<HStoreFile> getAllMapFiles() { + return new Vector<HStoreFile>(mapFiles.values()); } ////////////////////////////////////////////////////////////////////////////// @@ -391,12 +384,12 @@ // Grab a list of files to compact. Vector<HStoreFile> toCompactFiles = null; - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); try { toCompactFiles = new Vector<HStoreFile>(mapFiles.values()); } finally { - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps @@ -631,7 +624,7 @@ // 1. Acquiring the write-lock - this.locker.writeLock().lock(); + this.lock.obtainWriteLock(); Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); @@ -749,7 +742,7 @@ // 7. Releasing the write-lock - this.locker.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -765,7 +758,7 @@ * The returned object should map column names to byte arrays (byte[]). */ public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException { - this.locker.readLock().lock(); + this.lock.obtainReadLock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -794,7 +787,7 @@ } } finally { - this.locker.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -810,7 +803,7 @@ } Vector<BytesWritable> results = new Vector<BytesWritable>(); - this.locker.readLock().lock(); + this.lock.obtainReadLock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -851,7 +844,7 @@ } } finally { - this.locker.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -860,41 +853,58 @@ * * @param midKey - the middle key for the largest MapFile * @return - size of the largest MapFile - * @throws IOException */ - public long getLargestFileSize(Text midKey) throws IOException { + public long getLargestFileSize(Text midKey) { long maxSize = 0L; if (this.mapFiles.size() <= 0) { return maxSize; } - - long mapIndex = 0L; + this.lock.obtainReadLock(); + try { + long mapIndex = 0L; - // Iterate through all the MapFiles - - for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator(); - it.hasNext(); ) { - - Map.Entry<Long, HStoreFile> e = it.next(); - HStoreFile curHSF = e.getValue(); - long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME)); - - if(size > maxSize) { // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); + // Iterate through all the MapFiles + + for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator(); + it.hasNext(); ) { + + Map.Entry<Long, HStoreFile> e = it.next(); + HStoreFile curHSF = e.getValue(); + long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME)); + + if(size > maxSize) { // This is the largest one so far + maxSize = size; + mapIndex = e.getKey(); + } } - } - MapFile.Reader r = maps.get(mapIndex); - - synchronized(r) { + MapFile.Reader r = maps.get(mapIndex); + midKey.set(((HStoreKey)r.midKey()).getRow()); + + } catch(IOException e) { + LOG.warn(e); + + } finally { + this.lock.releaseReadLock(); } - return maxSize; } + /** + * @return Returns the number of map files currently in use + */ + public int getNMaps() { + this.lock.obtainReadLock(); + try { + return maps.size(); + + } finally { + this.lock.releaseReadLock(); + } + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -920,27 +930,6 @@ return new HStoreScanner(timestamp, targetCols, firstRow); } - /** For debuging purposes. Dumps the keys from all the MapFiles */ - void dumpMaps() throws IOException { - this.locker.readLock().lock(); - try { - for(Iterator<MapFile.Reader> i = maps.values().iterator(); i.hasNext(); ) { - MapFile.Reader r = i.next(); - synchronized(r) { - r.reset(); - HStoreKey key = new HStoreKey(); - BytesWritable val = new BytesWritable(); - while(r.next(key, val)) { - System.out.println(key); - } - } - } - - } finally { - this.locker.readLock().unlock(); - } - } - ////////////////////////////////////////////////////////////////////////////// // This class implements the HScannerInterface. // It lets the caller scan the contents of this HStore. @@ -952,7 +941,7 @@ public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) throws IOException { super(timestamp, targetCols); - locker.readLock().lock(); + lock.obtainReadLock(); try { this.readers = new MapFile.Reader[mapFiles.size()]; @@ -1056,7 +1045,7 @@ } } finally { - locker.readLock().unlock(); + lock.releaseReadLock(); scannerClosed = true; } } Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java Fri May 11 14:14:23 2007 @@ -18,6 +18,7 @@ import java.io.IOException; public class LockException extends IOException { + private static final long serialVersionUID = 1L << 13 - 1L; public LockException() { super(); } Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java Fri May 11 14:14:23 2007 @@ -44,12 +44,6 @@ private static final Text ROW_KEY = new Text(HGlobals.rootRegionInfo.regionName); - private void dumpRegion(HRegion r) throws IOException { - for(Iterator<HStore> i = r.stores.values().iterator(); i.hasNext(); ) { - i.next().dumpMaps(); - } - } - private void verifyGet(HRegion r) throws IOException { // This should return a value because there is only one family member @@ -180,7 +174,6 @@ // Read it back - dumpRegion(r); verifyGet(r); // Update one family member and add a new one @@ -211,7 +204,6 @@ // Read it back - dumpRegion(r); verifyGet(r); // Close region once and for all Modified: lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java?view=diff&rev=537293&r1=537292&r2=537293 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java (original) +++ lucene/hadoop/branches/branch-0.13/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java Fri May 11 14:14:23 2007 @@ -85,29 +85,30 @@ conf = new HBaseConfiguration(); Environment.getenv(); + Logger rootLogger = Logger.getRootLogger(); if(Environment.debugging) { - Logger rootLogger = Logger.getRootLogger(); rootLogger.setLevel(Level.WARN); + } + + ConsoleAppender consoleAppender = null; + for(Enumeration<Appender> e = (Enumeration<Appender>)rootLogger.getAllAppenders(); + e.hasMoreElements();) { - ConsoleAppender consoleAppender = null; - for(Enumeration<Appender> e = (Enumeration<Appender>)rootLogger.getAllAppenders(); - e.hasMoreElements();) { - - Appender a = e.nextElement(); - if(a instanceof ConsoleAppender) { - consoleAppender = (ConsoleAppender)a; - break; - } + Appender a = e.nextElement(); + if(a instanceof ConsoleAppender) { + consoleAppender = (ConsoleAppender)a; + break; } - if(consoleAppender != null) { - Layout layout = consoleAppender.getLayout(); - if(layout instanceof PatternLayout) { - PatternLayout consoleLayout = (PatternLayout)layout; - consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); - } + } + if(consoleAppender != null) { + Layout layout = consoleAppender.getLayout(); + if(layout instanceof PatternLayout) { + PatternLayout consoleLayout = (PatternLayout)layout; + consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); } - Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel); } + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel); + cluster = new MiniHBaseCluster(conf, 1); client = new HClient(conf);