Author: jimk Date: Thu Nov 29 13:10:03 2007 New Revision: 599578 URL: http://svn.apache.org/viewvc?rev=599578&view=rev Log: HADOOP-2295 Fix assigning a region to multiple servers
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Nov 29 13:10:03 2007 @@ -39,6 +39,7 @@ may not restart) HADOOP-2253 getRow can return HBASE::DELETEVAL cells (Bryan Duxbury via Stack) + HADOOP-2295 Fix assigning a region to multiple servers IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Thu Nov 29 13:10:03 2007 @@ -1136,12 +1136,16 @@ // Join up with all threads try { - rootScannerThread.join(); // Wait for the root scanner to finish. + if (rootScannerThread.isAlive()) { + rootScannerThread.join(); // Wait for the root scanner to finish. + } } catch (Exception iex) { LOG.warn("root scanner", iex); } try { - metaScannerThread.join(); // Wait for meta scanner to finish. + if (metaScannerThread.isAlive()) { + metaScannerThread.join(); // Wait for meta scanner to finish. + } } catch(Exception iex) { LOG.warn("meta scanner", iex); } @@ -1460,10 +1464,25 @@ // Get reports on what the RegionServer did. for (int i = 0; i < incomingMsgs.length; i++) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received " + incomingMsgs[i].toString() + "from " + + serverName); + } HRegionInfo region = incomingMsgs[i].getRegionInfo(); switch (incomingMsgs[i].getMsg()) { + case HMsg.MSG_REPORT_PROCESS_OPEN: + synchronized (this.assignAttempts) { + // Region server has acknowledged request to open region. + // Extend region open time by 1/2 max region open time. + assignAttempts.put(region.getRegionName(), + Long.valueOf(assignAttempts.get( + region.getRegionName()).longValue() + + (this.maxRegionOpenTime / 2))); + } + break; + case HMsg.MSG_REPORT_OPEN: HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName()); @@ -1484,9 +1503,7 @@ } else { LOG.info(info.getServerAddress().toString() + " serving " + region.getRegionName()); - // Remove from unassigned list so we don't assign it to someone else - this.unassignedRegions.remove(region.getRegionName()); - this.assignAttempts.remove(region.getRegionName()); + if (region.getRegionName().compareTo( HRegionInfo.rootRegionInfo.getRegionName()) == 0) { // Store the Root Region location (in memory) @@ -1495,21 +1512,23 @@ new HServerAddress(info.getServerAddress())); this.rootRegionLocation.notifyAll(); } - break; - } - - // Note that the table has been assigned and is waiting for the meta - // table to be updated. - - pendingRegions.add(region.getRegionName()); - - // Queue up an update to note the region location. - - try { - msgQueue.put(new ProcessRegionOpen(info, region)); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); - } + } else { + // Note that the table has been assigned and is waiting for the meta + // table to be updated. + + pendingRegions.add(region.getRegionName()); + + // Queue up an update to note the region location. + + try { + msgQueue.put(new ProcessRegionOpen(info, region)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); + } + } + // Remove from unassigned list so we don't assign it to someone else + this.unassignedRegions.remove(region.getRegionName()); + this.assignAttempts.remove(region.getRegionName()); } break; Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Thu Nov 29 13:10:03 2007 @@ -53,6 +53,9 @@ /** region server is no longer serving the specified region */ public static final byte MSG_REPORT_CLOSE = 101; + + /** region server is processing open request */ + public static final byte MSG_REPORT_PROCESS_OPEN = 102; /** * region server split the region associated with this message. @@ -140,6 +143,10 @@ case MSG_REGION_CLOSE_WITHOUT_REPORT: message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : "); + break; + + case MSG_REPORT_PROCESS_OPEN: + message.append("MSG_REPORT_PROCESS_OPEN : "); break; case MSG_REPORT_OPEN: Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Thu Nov 29 13:10:03 2007 @@ -742,6 +742,10 @@ throw new RuntimeException("Putting into msgQueue was " + "interrupted.", e); } + if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) { + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN, + msgs[i].getRegionInfo())); + } } } } @@ -982,11 +986,11 @@ * Presumption is that all closes and stops have already been called. */ void join() { - join(this.workerThread); join(this.logRoller); join(this.cacheFlusher); join(this.compactor); join(this.splitter); + join(this.workerThread); } private void join(final Thread t) { @@ -1161,8 +1165,8 @@ } finally { this.lock.writeLock().unlock(); } + reportOpen(region); } - reportOpen(region); } void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java Thu Nov 29 13:10:03 2007 @@ -108,11 +108,13 @@ public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); this.stop.set(true); - try { - this.leaseMonitorThread.interrupt(); - this.leaseMonitorThread.join(); - } catch (InterruptedException iex) { - // Ignore + while (this.leaseMonitorThread.isAlive()) { + try { + this.leaseMonitorThread.interrupt(); + this.leaseMonitorThread.join(); + } catch (InterruptedException iex) { + // Ignore + } } synchronized(leases) { synchronized(sortedLeases) { @@ -211,10 +213,16 @@ * Its a daemon thread. */ class LeaseMonitor extends Chore { + /** + * @param p + * @param s + */ public LeaseMonitor(int p, AtomicBoolean s) { super(p, s); } + /** [EMAIL PROTECTED] */ + @Override protected void chore() { synchronized(leases) { synchronized(sortedLeases) { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java Thu Nov 29 13:10:03 2007 @@ -53,7 +53,9 @@ private final HMaster master; private final List<RegionServerThread> regionThreads; private final static int DEFAULT_NO = 1; + /** local mode */ public static final String LOCAL = "local"; + /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; @@ -146,12 +148,14 @@ public String waitOnRegionServer(int serverNumber) { RegionServerThread regionServerThread = this.regionThreads.remove(serverNumber); - try { - LOG.info("Waiting on " + - regionServerThread.getRegionServer().serverInfo.toString()); - regionServerThread.join(); - } catch (InterruptedException e) { - e.printStackTrace(); + while (regionServerThread.isAlive()) { + try { + LOG.info("Waiting on " + + regionServerThread.getRegionServer().serverInfo.toString()); + regionServerThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } return regionServerThread.getName(); } @@ -217,10 +221,12 @@ } } if (this.master != null) { - try { - this.master.join(); - } catch(InterruptedException e) { - // continue + while (this.master.isAlive()) { + try { + this.master.join(); + } catch(InterruptedException e) { + // continue + } } } LOG.info("Shutdown " + Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java Thu Nov 29 13:10:03 2007 @@ -22,19 +22,10 @@ import junit.framework.TestSuite; import junit.textui.TestRunner; -import java.io.PrintWriter; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * Test ability of HBase to handle DFS failure */ public class DFSAbort extends HBaseClusterTestCase { - private static final Log LOG = - LogFactory.getLog(DFSAbort.class.getName()); - /** constructor */ public DFSAbort() { super(); @@ -66,8 +57,6 @@ // By now the Mini DFS is running, Mini HBase is running and we have // created a table. Now let's yank the rug out from HBase cluster.getDFSCluster().shutdown(); - // Now wait for Mini HBase Cluster to shut down -// cluster.join(); threadDumpingJoin(); } catch (Exception e) { e.printStackTrace(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=599578&r1=599577&r2=599578&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Thu Nov 29 13:10:03 2007 @@ -127,33 +127,41 @@ this.server = cluster.getRegionThreads().get(0).getRegionServer(); this.log = server.getLog(); - + // When the META table can be opened, the region servers are running - @SuppressWarnings("unused") HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); - // Create the test table and open it - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); - HBaseAdmin admin = new HBaseAdmin(conf); - admin.createTable(desc); - HTable table = new HTable(conf, new Text(tableName)); + try { + + // Create the test table and open it + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + HTable table = new HTable(conf, new Text(tableName)); + + try { + for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls + long lockid = + table.startUpdate(new Text("row" + String.format("%1$04d", i))); + table.put(lockid, HConstants.COLUMN_FAMILY, value); + table.commit(lockid); + + if (i % 256 == 0) { + // After every 256 writes sleep to let the log roller run - for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls - long lockid = - table.startUpdate(new Text("row" + String.format("%1$04d", i))); - table.put(lockid, HConstants.COLUMN_FAMILY, value); - table.commit(lockid); - - if (i % 256 == 0) { - // After every 256 writes sleep to let the log roller run - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // continue + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // continue + } + } } + } finally { + table.close(); } + } finally { + meta.close(); } }