Author: jimk Date: Tue Jul 17 19:26:03 2007 New Revision: 557118 URL: http://svn.apache.org/viewvc?view=rev&rev=557118 Log: HADOOP-1615 Replacing thread notification-based queue with java.util.concurrent.BlockingQueue in HMaster, HRegionServer
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=557118&r1=557117&r2=557118 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue Jul 17 19:26:03 2007 @@ -65,4 +65,5 @@ 41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates 42. HADOOP-1468 Add HBase batch update to reduce RPC overhead 43. HADOOP-1616 Sporadic TestTable failures - + 44. HADOOP-1615 Replacing thread notification-based queue with + java.util.concurrent.BlockingQueue in HMaster, HRegionServer Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=557118&r1=557117&r2=557118 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue Jul 17 19:26:03 2007 @@ -26,7 +26,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.Set; @@ -37,6 +36,9 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +88,7 @@ int numRetries; long maxRegionOpenTime; - LinkedList<PendingOperation> msgQueue; + BlockingQueue<PendingOperation> msgQueue; private Leases serverLeases; private Server server; @@ -636,7 +638,7 @@ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 2); this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); - this.msgQueue = new LinkedList<PendingOperation>(); + this.msgQueue = new LinkedBlockingQueue<PendingOperation>(); this.serverLeases = new Leases( conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); @@ -736,18 +738,13 @@ // Main processing loop for (PendingOperation op = null; !closed; ) { - synchronized(msgQueue) { - while(msgQueue.size() == 0 && !closed) { - try { - msgQueue.wait(threadWakeFrequency); - } catch(InterruptedException iex) { - // continue - } - } - if(closed) { - continue; - } - op = msgQueue.removeFirst(); + try { + op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // continue + } + if (op == null || closed) { + continue; } try { if (LOG.isDebugEnabled()) { @@ -765,8 +762,10 @@ } } LOG.warn(ex); - synchronized(msgQueue) { - msgQueue.addLast(op); + try { + msgQueue.put(op); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } } @@ -874,10 +873,11 @@ // name, then we can timeout the old one right away and register // the new one. storedInfo = serversToServerInfo.remove(s); - if(storedInfo != null && !closed) { - synchronized(msgQueue) { - msgQueue.addLast(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); + if (storedInfo != null && !closed) { + try { + msgQueue.put(new PendingServerShutdown(storedInfo)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } @@ -1064,9 +1064,10 @@ // Queue up an update to note the region location. - synchronized(msgQueue) { - msgQueue.addLast(new PendingOpenReport(info, region)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingOpenReport(info, region)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } break; @@ -1097,9 +1098,10 @@ unassignedRegions.remove(region.regionName); assignAttempts.remove(region.regionName); - synchronized(msgQueue) { - msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } // NOTE: we cannot put the region into unassignedRegions as that @@ -2406,9 +2408,10 @@ HGlobals.rootRegionInfo); assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); } - synchronized(msgQueue) { - msgQueue.addLast(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingServerShutdown(storedInfo)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=557118&r1=557117&r2=557118 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Tue Jul 17 19:26:03 2007 @@ -24,12 +24,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -449,7 +451,7 @@ this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker); // Process requests from Master - this.toDo = new LinkedList<ToDoEntry>(); + this.toDo = new LinkedBlockingQueue<ToDoEntry>(); this.worker = new Worker(); this.workerThread = new Thread(worker); @@ -661,7 +663,11 @@ if (LOG.isDebugEnabled()) { LOG.debug("Got default message"); } - toDo.addLast(new ToDoEntry(msgs[i])); + try { + toDo.put(new ToDoEntry(msgs[i])); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); + } } } @@ -828,7 +834,7 @@ this.msg = msg; } } - LinkedList<ToDoEntry> toDo; + BlockingQueue<ToDoEntry> toDo; private Worker worker; private Thread workerThread; /** Thread that performs long running requests from the master */ @@ -844,26 +850,14 @@ */ public void run() { for(ToDoEntry e = null; !stopRequested; ) { - synchronized(toDo) { - while(toDo.size() == 0 && !stopRequested) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Wait on todo"); - } - toDo.wait(threadWakeFrequency); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake on todo"); - } - } catch(InterruptedException ex) { - // continue - } - } - if(stopRequested) { - continue; - } - e = toDo.removeFirst(); + try { + e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + // continue + } + if(e == null || stopRequested) { + continue; } - try { if (LOG.isDebugEnabled()) { LOG.debug(e.msg.toString()); @@ -900,8 +894,10 @@ if(e.tries < numRetries) { LOG.warn(ie); e.tries++; - synchronized(toDo) { - toDo.addLast(e); + try { + toDo.put(e); + } catch (InterruptedException ex) { + throw new RuntimeException("Putting into msgQueue was interrupted.", ex); } } else { LOG.error("unable to process message: " + e.msg.toString(), ie);