Author: jimk Date: Mon May 21 22:30:07 2007 New Revision: 540424 URL: http://svn.apache.org/viewvc?view=rev&rev=540424 Log: HADOOP-1397. Replace custom hbase locking with java.util.concurrent.locks.ReentrantLock
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=540424&r1=540423&r2=540424 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Mon May 21 22:30:07 2007 @@ -6,3 +6,5 @@ 1. HADOOP-1384. HBase omnibus patch. (jimk, Vuk Ercegovac, and Michael Stack) 2. HADOOP-1402. Fix javadoc warnings in hbase contrib. (Michael Stack) 3. HADOOP-1404. HBase command-line shutdown failing (Michael Stack) + 4. HADOOP-1397. Replace custom hbase locking with + java.util.concurrent.locks.ReentrantLock (Michael Stack) Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=540424&r1=540423&r2=540424 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Mon May 21 22:30:07 2007 @@ -23,6 +23,7 @@ import java.io.*; import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -284,7 +285,7 @@ int maxUnflushedEntries = 0; int compactionThreshold = 0; - HLocking lock = null; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ////////////////////////////////////////////////////////////////////////////// // Constructor @@ -322,8 +323,6 @@ this.writestate.writesOngoing = true; this.writestate.writesEnabled = true; this.writestate.closed = false; - - this.lock = new HLocking(); // Declare the regionName. This is a unique string for the region, used to // build a unique filename. @@ -401,7 +400,7 @@ * time-sensitive thread. */ public Vector<HStoreFile> close() throws IOException { - lock.obtainWriteLock(); + lock.writeLock().lock(); try { boolean shouldClose = false; synchronized(writestate) { @@ -441,7 +440,7 @@ } } } finally { - lock.releaseWriteLock(); + lock.writeLock().unlock(); } } @@ -617,7 +616,7 @@ * @return - true if the region should be split */ public boolean needsSplit(Text midKey) { - lock.obtainReadLock(); + lock.readLock().lock(); try { Text key = new Text(); @@ -635,7 +634,7 @@ return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2))); } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } } @@ -644,7 +643,7 @@ */ public boolean needsCompaction() { boolean needsCompaction = false; - lock.obtainReadLock(); + lock.readLock().lock(); try { for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) { if(i.next().getNMaps() > compactionThreshold) { @@ -653,7 +652,7 @@ } } } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } return needsCompaction; } @@ -673,7 +672,7 @@ */ public boolean compactStores() throws IOException { boolean shouldCompact = false; - lock.obtainReadLock(); + lock.readLock().lock(); try { synchronized(writestate) { if((! writestate.writesOngoing) @@ -686,7 +685,7 @@ } } } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } if(! shouldCompact) { @@ -694,7 +693,7 @@ return false; } else { - lock.obtainWriteLock(); + lock.writeLock().lock(); try { LOG.info("starting compaction on region " + this.regionInfo.regionName); for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) { @@ -710,7 +709,7 @@ recentCommits = 0; writestate.notifyAll(); } - lock.releaseWriteLock(); + lock.writeLock().unlock(); } } } @@ -931,7 +930,7 @@ private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException { - lock.obtainReadLock(); + lock.readLock().lock(); try { // Check the memcache @@ -951,7 +950,7 @@ return targetStore.get(key, numVersions); } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } } @@ -968,7 +967,7 @@ public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException { HStoreKey key = new HStoreKey(row, System.currentTimeMillis()); - lock.obtainReadLock(); + lock.readLock().lock(); try { TreeMap<Text, BytesWritable> memResult = memcache.getFull(key); for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) { @@ -979,7 +978,7 @@ return memResult; } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } } @@ -988,7 +987,7 @@ * columns. This Iterator must be closed by the caller. */ public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException { - lock.obtainReadLock(); + lock.readLock().lock(); try { TreeSet<Text> families = new TreeSet<Text>(); for(int i = 0; i < cols.length; i++) { @@ -1004,7 +1003,7 @@ return new HScanner(cols, firstRow, memcache, storelist); } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } } @@ -1027,12 +1026,11 @@ // We obtain a per-row lock, so other clients will // block while one client performs an update. - lock.obtainReadLock(); + lock.readLock().lock(); try { return obtainLock(row); - } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=540424&r1=540423&r2=540424 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Mon May 21 22:30:07 2007 @@ -25,6 +25,7 @@ import java.io.*; import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -50,7 +51,7 @@ private Configuration conf; private Random rand; private TreeMap<Text, HRegion> regions; // region name -> HRegion - private HLocking lock; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private Vector<HMsg> outboundMsgs; private long threadWakeFrequency; @@ -71,9 +72,12 @@ * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text) */ public void regionIsUnavailable(Text regionName) { - lock.obtainWriteLock(); - regions.remove(regionName); - lock.releaseWriteLock(); + lock.writeLock().lock(); + try { + regions.remove(regionName); + } finally { + lock.writeLock().unlock(); + } } /* (non-Javadoc) @@ -88,11 +92,11 @@ // Grab a list of regions to check Vector<HRegion> regionsToCheck = new Vector<HRegion>(); - lock.obtainReadLock(); + lock.readLock().lock(); try { regionsToCheck.addAll(regions.values()); } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } try { @@ -163,10 +167,13 @@ // Finally, start serving the new regions - lock.obtainWriteLock(); - regions.put(newRegions[0].getRegionName(), newRegions[0]); - regions.put(newRegions[1].getRegionName(), newRegions[1]); - lock.releaseWriteLock(); + lock.writeLock().lock(); + try { + regions.put(newRegions[0].getRegionName(), newRegions[0]); + regions.put(newRegions[1].getRegionName(), newRegions[1]); + } finally { + lock.writeLock().unlock(); + } } } } @@ -214,12 +221,11 @@ // Grab a list of items to flush Vector<HRegion> toFlush = new Vector<HRegion>(); - lock.obtainReadLock(); + lock.readLock().lock(); try { toFlush.addAll(regions.values()); - } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); } // Flush them, if necessary @@ -340,7 +346,6 @@ this.conf = conf; this.rand = new Random(); this.regions = new TreeMap<Text, HRegion>(); - this.lock = new HLocking(); this.outboundMsgs = new Vector<HMsg>(); this.scanners = Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>()); @@ -752,27 +757,26 @@ } private void openRegion(HRegionInfo regionInfo) throws IOException { - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); try { HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); regions.put(region.getRegionName(), region); - reportOpen(region); - + reportOpen(region); } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } } private void closeRegion(HRegionInfo info, boolean reportWhenCompleted) throws IOException { - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); HRegion region = null; try { region = regions.remove(info.regionName); } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } if(region != null) { @@ -785,13 +789,12 @@ } private void closeAndDeleteRegion(HRegionInfo info) throws IOException { - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); HRegion region = null; try { region = regions.remove(info.regionName); - } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } if(region != null) { if(LOG.isDebugEnabled()) { @@ -809,13 +812,12 @@ /** Called either when the master tells us to restart or from stop() */ private void closeAllRegions() { Vector<HRegion> regionsToClose = new Vector<HRegion>(); - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); try { regionsToClose.addAll(regions.values()); regions.clear(); - } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) { HRegion region = it.next(); @@ -842,7 +844,7 @@ ****************************************************************************/ /* private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException { - locking.obtainWriteLock(); + locking.writeLock().lock(); try { HRegion srcA = regions.remove(regionNameA); HRegion srcB = regions.remove(regionNameB); @@ -854,7 +856,7 @@ reportOpen(newRegion); } finally { - locking.releaseWriteLock(); + locking.writeLock().unlock(); } } */ @@ -1016,13 +1018,12 @@ /** Private utility method for safely obtaining an HRegion handle. */ private HRegion getRegion(Text regionName) throws NotServingRegionException { - this.lock.obtainReadLock(); + this.lock.readLock().lock(); HRegion region = null; try { region = regions.get(regionName); - } finally { - this.lock.releaseReadLock(); + this.lock.readLock().unlock(); } if(region == null) { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=540424&r1=540423&r2=540424 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Mon May 21 22:30:07 2007 @@ -23,6 +23,7 @@ import java.util.Random; import java.util.TreeMap; import java.util.Vector; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +64,7 @@ Integer compactLock = new Integer(0); Integer flushLock = new Integer(0); - HLocking lock = new HLocking(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>(); TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>(); @@ -238,7 +239,7 @@ /** Turn off all the MapFile readers */ public void close() throws IOException { - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily); try { @@ -252,7 +253,7 @@ LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily); } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } } @@ -324,7 +325,7 @@ // C. Finally, make the new MapFile available. if(addToAvailableMaps) { - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); try { maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf)); @@ -335,7 +336,7 @@ } } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } } return getAllMapFiles(); @@ -343,12 +344,12 @@ } public Vector<HStoreFile> getAllMapFiles() { - this.lock.obtainReadLock(); + this.lock.readLock().lock(); try { return new Vector<HStoreFile>(mapFiles.values()); } finally { - this.lock.releaseReadLock(); + this.lock.readLock().unlock(); } } @@ -390,12 +391,12 @@ // Grab a list of files to compact. Vector<HStoreFile> toCompactFiles = null; - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); try { toCompactFiles = new Vector<HStoreFile>(mapFiles.values()); } finally { - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps @@ -630,7 +631,7 @@ // 1. Acquiring the write-lock - this.lock.obtainWriteLock(); + this.lock.writeLock().lock(); Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); @@ -748,7 +749,7 @@ // 7. Releasing the write-lock - this.lock.releaseWriteLock(); + this.lock.writeLock().unlock(); } } @@ -764,7 +765,7 @@ * The returned object should map column names to byte arrays (byte[]). */ public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException { - this.lock.obtainReadLock(); + this.lock.readLock().lock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -793,7 +794,7 @@ } } finally { - this.lock.releaseReadLock(); + this.lock.readLock().unlock(); } } @@ -809,7 +810,7 @@ } Vector<BytesWritable> results = new Vector<BytesWritable>(); - this.lock.obtainReadLock(); + this.lock.readLock().lock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -850,7 +851,7 @@ } } finally { - this.lock.releaseReadLock(); + this.lock.readLock().unlock(); } } @@ -866,7 +867,7 @@ return maxSize; } - this.lock.obtainReadLock(); + this.lock.readLock().lock(); try { long mapIndex = 0L; @@ -893,7 +894,7 @@ LOG.warn(e); } finally { - this.lock.releaseReadLock(); + this.lock.readLock().unlock(); } return maxSize; } @@ -902,12 +903,12 @@ * @return Returns the number of map files currently in use */ public int getNMaps() { - this.lock.obtainReadLock(); + this.lock.readLock().lock(); try { return maps.size(); } finally { - this.lock.releaseReadLock(); + this.lock.readLock().unlock(); } } @@ -949,7 +950,7 @@ super(timestamp, targetCols); - lock.obtainReadLock(); + lock.readLock().lock(); try { this.readers = new MapFile.Reader[mapFiles.size()]; @@ -1064,7 +1065,7 @@ } } finally { - lock.releaseReadLock(); + lock.readLock().unlock(); scannerClosed = true; } }