Author: jimk Date: Wed May 23 14:30:25 2007 New Revision: 541095 URL: http://svn.apache.org/viewvc?view=rev&rev=541095 Log: HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix regression introduced by HADOOP-1397.
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=541095&r1=541094&r2=541095 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed May 23 14:30:25 2007 @@ -14,3 +14,5 @@ 'Performance Evaluation', etc. 7. HADOOP-1420, HADOOP-1423. Findbugs changes, remove reference to removed class HLocking. + 8. HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix + regression introduced by HADOOP-1397. Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java?view=auto&rev=541095 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Wed May 23 14:30:25 2007 @@ -0,0 +1,101 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * HLocking is a set of lock primitives that does not rely on a + * particular thread holding the monitor for an object. This is + * especially important when a lock must persist over multiple RPC's + * since there is no guarantee that the same Server thread will handle + * all the RPC's until the lock is released. Not requiring that the locker + * thread is same as unlocking thread is the key distinction between this + * class and [EMAIL PROTECTED] java.util.concurrent.locks.ReentrantReadWriteLock}. + * + * <p>For each independent entity that needs locking, create a new HLocking + * instance. + */ +public class HLocking { + private Integer mutex; + + // If lockers == 0, the lock is unlocked + // If lockers > 0, locked for read + // If lockers == -1 locked for write + + private AtomicInteger lockers; + + /** Constructor */ + public HLocking() { + this.mutex = new Integer(0); + this.lockers = new AtomicInteger(0); + } + + /** + * Caller needs the nonexclusive read-lock + */ + public void obtainReadLock() { + synchronized(mutex) { + while(lockers.get() < 0) { + try { + mutex.wait(); + } catch(InterruptedException ie) { + } + } + lockers.incrementAndGet(); + mutex.notifyAll(); + } + } + + /** + * Caller is finished with the nonexclusive read-lock + */ + public void releaseReadLock() { + synchronized(mutex) { + if(lockers.decrementAndGet() < 0) { + throw new IllegalStateException("lockers: " + lockers); + } + mutex.notifyAll(); + } + } + + /** + * Caller needs the exclusive write-lock + */ + public void obtainWriteLock() { + synchronized(mutex) { + while(!lockers.compareAndSet(0, -1)) { + try { + mutex.wait(); + } catch (InterruptedException ie) { + } + } + mutex.notifyAll(); + } + } + + /** + * Caller is finished with the write lock + */ + public void releaseWriteLock() { + synchronized(mutex) { + if(!lockers.compareAndSet(-1, 0)) { + throw new IllegalStateException("lockers: " + lockers); + } + mutex.notifyAll(); + } + } +} Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=541095&r1=541094&r2=541095 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Wed May 23 14:30:25 2007 @@ -15,14 +15,17 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import java.io.*; -import java.util.*; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; /******************************************************************************* * The HMemcache holds in-memory modifications to the HRegion. This is really a @@ -39,7 +42,7 @@ TreeMap<HStoreKey, BytesWritable> snapshot = null; - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final HLocking lock = new HLocking(); public HMemcache() { super(); @@ -70,7 +73,7 @@ public Snapshot snapshotMemcacheForLog(HLog log) throws IOException { Snapshot retval = new Snapshot(); - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { if(snapshot != null) { throw new IOException("Snapshot in progress!"); @@ -99,7 +102,7 @@ return retval; } finally { - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -109,7 +112,7 @@ * Modifying the structure means we need to obtain a writelock. */ public void deleteSnapshot() throws IOException { - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { if(snapshot == null) { @@ -135,7 +138,7 @@ } } finally { - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -145,14 +148,14 @@ * Operation uses a write lock. */ public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) { - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) { HStoreKey key = new HStoreKey(row, es.getKey(), timestamp); memcache.put(key, es.getValue()); } } finally { - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -163,7 +166,7 @@ */ public BytesWritable[] get(HStoreKey key, int numVersions) { Vector<BytesWritable> results = new Vector<BytesWritable>(); - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { Vector<BytesWritable> result = get(memcache, key, numVersions-results.size()); results.addAll(0, result); @@ -180,7 +183,7 @@ return (results.size() == 0)? null: results.toArray(new BytesWritable[results.size()]); } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -192,7 +195,7 @@ */ public TreeMap<Text, BytesWritable> getFull(HStoreKey key) { TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>(); - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { internalGetFull(memcache, key, results); for(int i = history.size()-1; i >= 0; i--) { @@ -202,7 +205,7 @@ return results; } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -275,7 +278,7 @@ super(timestamp, targetCols); - lock.readLock().lock(); + lock.obtainReadLock(); try { this.backingMaps = new TreeMap[history.size() + 1]; @@ -367,7 +370,7 @@ } } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); scannerClosed = true; } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=541095&r1=541094&r2=541095 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed May 23 14:30:25 2007 @@ -23,7 +23,6 @@ import java.io.*; import java.util.*; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -283,7 +282,7 @@ int maxUnflushedEntries = 0; int compactionThreshold = 0; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final HLocking lock = new HLocking(); ////////////////////////////////////////////////////////////////////////////// // Constructor @@ -398,7 +397,7 @@ * time-sensitive thread. */ public Vector<HStoreFile> close() throws IOException { - lock.writeLock().lock(); + lock.obtainWriteLock(); try { boolean shouldClose = false; synchronized(writestate) { @@ -438,7 +437,7 @@ } } } finally { - lock.writeLock().unlock(); + lock.releaseWriteLock(); } } @@ -614,7 +613,7 @@ * @return - true if the region should be split */ public boolean needsSplit(Text midKey) { - lock.readLock().lock(); + lock.obtainReadLock(); try { Text key = new Text(); @@ -632,7 +631,7 @@ return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2))); } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } } @@ -641,7 +640,7 @@ */ public boolean needsCompaction() { boolean needsCompaction = false; - lock.readLock().lock(); + lock.obtainReadLock(); try { for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) { if(i.next().getNMaps() > compactionThreshold) { @@ -650,7 +649,7 @@ } } } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } return needsCompaction; } @@ -670,7 +669,7 @@ */ public boolean compactStores() throws IOException { boolean shouldCompact = false; - lock.readLock().lock(); + lock.obtainReadLock(); try { synchronized(writestate) { if((! writestate.writesOngoing) @@ -683,32 +682,30 @@ } } } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } if(! shouldCompact) { LOG.info("not compacting region " + this.regionInfo.regionName); - return false; - - } else { - lock.writeLock().lock(); - try { - LOG.info("starting compaction on region " + this.regionInfo.regionName); - for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) { - HStore store = it.next(); - store.compact(); - } - LOG.info("compaction completed on region " + this.regionInfo.regionName); - return true; - - } finally { - synchronized(writestate) { - writestate.writesOngoing = false; - recentCommits = 0; - writestate.notifyAll(); - } - lock.writeLock().unlock(); + return false; + } + lock.obtainWriteLock(); + try { + LOG.info("starting compaction on region " + this.regionInfo.regionName); + for (Iterator<HStore> it = stores.values().iterator(); it.hasNext();) { + HStore store = it.next(); + store.compact(); + } + LOG.info("compaction completed on region " + this.regionInfo.regionName); + return true; + + } finally { + synchronized (writestate) { + writestate.writesOngoing = false; + recentCommits = 0; + writestate.notifyAll(); } + lock.releaseWriteLock(); } } @@ -928,7 +925,7 @@ private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException { - lock.readLock().lock(); + lock.obtainReadLock(); try { // Check the memcache @@ -948,7 +945,7 @@ return targetStore.get(key, numVersions); } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } } @@ -965,7 +962,7 @@ public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException { HStoreKey key = new HStoreKey(row, System.currentTimeMillis()); - lock.readLock().lock(); + lock.obtainReadLock(); try { TreeMap<Text, BytesWritable> memResult = memcache.getFull(key); for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) { @@ -976,7 +973,7 @@ return memResult; } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } } @@ -985,7 +982,7 @@ * columns. This Iterator must be closed by the caller. */ public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException { - lock.readLock().lock(); + lock.obtainReadLock(); try { TreeSet<Text> families = new TreeSet<Text>(); for(int i = 0; i < cols.length; i++) { @@ -1001,7 +998,7 @@ return new HScanner(cols, firstRow, memcache, storelist); } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } } @@ -1024,11 +1021,11 @@ // We obtain a per-row lock, so other clients will // block while one client performs an update. - lock.readLock().lock(); + lock.obtainReadLock(); try { return obtainLock(row); } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=541095&r1=541094&r2=541095 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed May 23 14:30:25 2007 @@ -23,7 +23,6 @@ import java.util.Random; import java.util.TreeMap; import java.util.Vector; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,7 +63,7 @@ Integer compactLock = 0; Integer flushLock = 0; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final HLocking lock = new HLocking(); TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>(); TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>(); @@ -237,7 +236,7 @@ /** Turn off all the MapFile readers */ public void close() throws IOException { LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily); - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { for (MapFile.Reader map: maps.values()) { map.close(); @@ -247,7 +246,7 @@ LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily); } finally { - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -319,7 +318,7 @@ // C. Finally, make the new MapFile available. if(addToAvailableMaps) { - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf)); @@ -330,7 +329,7 @@ } } finally { - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } } return getAllMapFiles(); @@ -338,12 +337,12 @@ } public Vector<HStoreFile> getAllMapFiles() { - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { return new Vector<HStoreFile>(mapFiles.values()); } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -385,12 +384,12 @@ // Grab a list of files to compact. Vector<HStoreFile> toCompactFiles = null; - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { toCompactFiles = new Vector<HStoreFile>(mapFiles.values()); } finally { - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps @@ -627,7 +626,7 @@ Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); - this.lock.writeLock().lock(); + this.lock.obtainWriteLock(); try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); if(! fs.exists(doneFile)) { @@ -744,7 +743,7 @@ // 7. Releasing the write-lock - this.lock.writeLock().unlock(); + this.lock.releaseWriteLock(); } } @@ -760,7 +759,7 @@ * The returned object should map column names to byte arrays (byte[]). */ public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException { - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -789,7 +788,7 @@ } } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -805,7 +804,7 @@ } Vector<BytesWritable> results = new Vector<BytesWritable>(); - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -846,7 +845,7 @@ } } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -862,7 +861,7 @@ return maxSize; } - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { long mapIndex = 0L; @@ -889,7 +888,7 @@ LOG.warn(e); } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } return maxSize; } @@ -898,12 +897,12 @@ * @return Returns the number of map files currently in use */ public int getNMaps() { - this.lock.readLock().lock(); + this.lock.obtainReadLock(); try { return maps.size(); } finally { - this.lock.readLock().unlock(); + this.lock.releaseReadLock(); } } @@ -945,7 +944,7 @@ super(timestamp, targetCols); - lock.readLock().lock(); + lock.obtainReadLock(); try { this.readers = new MapFile.Reader[mapFiles.size()]; @@ -1060,7 +1059,7 @@ } } finally { - lock.readLock().unlock(); + lock.releaseReadLock(); scannerClosed = true; } }