Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Nov 24 23:17:38 2007 @@ -26,15 +26,19 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.DelayQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,27 +129,79 @@ /** region server process name */ public static final String REGIONSERVER = "regionserver"; + /** Queue entry passed to flusher, compactor and splitter threads */ + class QueueEntry implements Delayed { + private final HRegion region; + private long expirationTime; + + QueueEntry(HRegion region, long expirationTime) { + this.region = region; + this.expirationTime = expirationTime; + } + + /** [EMAIL PROTECTED] */ + @Override + public boolean equals(Object o) { + QueueEntry other = (QueueEntry) o; + return this.hashCode() == other.hashCode(); + } + + /** [EMAIL PROTECTED] */ + @Override + public int hashCode() { + return this.region.getRegionInfo().hashCode(); + } + + /** [EMAIL PROTECTED] */ + public long getDelay(TimeUnit unit) { + return unit.convert(this.expirationTime - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** [EMAIL PROTECTED] */ + public int compareTo(Delayed o) { + long delta = this.getDelay(TimeUnit.MILLISECONDS) - + o.getDelay(TimeUnit.MILLISECONDS); + + int value = 0; + if (delta > 0) { + value = 1; + + } else if (delta < 0) { + value = -1; + } + return value; + } + + /** @return the region */ + public HRegion getRegion() { + return region; + } + + /** @param expirationTime the expirationTime to set */ + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; + } + } + // Check to see if regions should be split - private final Thread splitOrCompactCheckerThread; + final Splitter splitter; // Needed at shutdown. On way out, if can get this lock then we are not in // middle of a split or compaction: i.e. splits/compactions cannot be // interrupted. - protected final Integer splitOrCompactLock = new Integer(0); + final Integer splitterLock = new Integer(0); - /* - * Runs periodically to determine if regions need to be compacted or split - */ - class SplitOrCompactChecker extends Chore - implements RegionUnavailableListener { + /** Split regions on request */ + class Splitter extends Thread implements RegionUnavailableListener { + private final BlockingQueue<QueueEntry> splitQueue = + new LinkedBlockingQueue<QueueEntry>(); + private HTable root = null; private HTable meta = null; - /** - * @param stop - */ - public SplitOrCompactChecker(final AtomicBoolean stop) { - super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency", - 30 * 1000), stop); + /** constructor */ + public Splitter() { + super(); } /** [EMAIL PROTECTED] */ @@ -178,35 +234,50 @@ } /** - * Scan for splits or compactions to run. Run any we find. + * Perform region splits if necessary */ @Override - protected void chore() { - // Don't interrupt us while we're working - synchronized (splitOrCompactLock) { - checkForSplitsOrCompactions(); - } - } - - private void checkForSplitsOrCompactions() { - // Grab a list of regions to check - List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck(); - for(HRegion cur: nonClosedRegionsToCheck) { + public void run() { + while (!stopRequested.get()) { + QueueEntry e = null; try { - if (cur.compactIfNeeded()) { - // After compaction, it probably needs splitting. May also need - // splitting just because one of the memcache flushes was big. - split(cur); - } + e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException ex) { + continue; + } + if (e == null) { + continue; + } + synchronized (splitterLock) { // Don't interrupt us while we're working + try { + split(e.getRegion()); + + } catch (IOException ex) { + LOG.error("Split failed for region " + + e.getRegion().getRegionName(), + RemoteExceptionHandler.checkIOException(ex)); + if (!checkFileSystem()) { + break; + } - } catch(IOException e) { - //TODO: What happens if this fails? Are we toast? - LOG.error("Split or compaction failed", e); - if (!checkFileSystem()) { - break; + } catch (Exception ex) { + LOG.error("Split failed on region " + + e.getRegion().getRegionName(), ex); + if (!checkFileSystem()) { + break; + } } } } + LOG.info(getName() + " exiting"); + } + + /** + * @param e entry indicating which region needs to be split + */ + public void splitRequested(QueueEntry e) { + splitQueue.add(e); } private void split(final HRegion region) throws IOException { @@ -271,100 +342,240 @@ } } - // Cache flushing - private final Thread cacheFlusherThread; + // Compactions + final Compactor compactor; // Needed during shutdown so we send an interrupt after completion of a - // flush, not in the midst. - protected final Integer cacheFlusherLock = new Integer(0); - - /* Runs periodically to flush memcache. - */ - class Flusher extends Chore { - /** - * @param period - * @param stop - */ - public Flusher(final int period, final AtomicBoolean stop) { - super(period, stop); + // compaction, not in the midst. + final Integer compactionLock = new Integer(0); + + /** Compact region on request */ + class Compactor extends Thread { + private final BlockingQueue<QueueEntry> compactionQueue = + new LinkedBlockingQueue<QueueEntry>(); + + /** constructor */ + public Compactor() { + super(); } + /** [EMAIL PROTECTED] */ @Override - protected void chore() { - synchronized(cacheFlusherLock) { - checkForFlushesToRun(); - } - } - - private void checkForFlushesToRun() { - // Grab a list of items to flush - List<HRegion> nonClosedRegionsToFlush = getRegionsToCheck(); - // Flush them, if necessary - for(HRegion cur: nonClosedRegionsToFlush) { + public void run() { + while (!stopRequested.get()) { + QueueEntry e = null; + try { + e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException ex) { + continue; + } + if (e == null) { + continue; + } try { - cur.flushcache(); - } catch (DroppedSnapshotException e) { - // Cache flush can fail in a few places. If it fails in a critical - // section, we get a DroppedSnapshotException and a replay of hlog - // is required. Currently the only way to do this is a restart of - // the server. - LOG.fatal("Replay of hlog required. Forcing server restart", e); + if (e.getRegion().compactIfNeeded()) { + splitter.splitRequested(e); + } + + } catch (IOException ex) { + LOG.error("Compaction failed for region " + + e.getRegion().getRegionName(), + RemoteExceptionHandler.checkIOException(ex)); if (!checkFileSystem()) { break; } - HRegionServer.this.stop(); - } catch (IOException iex) { - LOG.error("Cache flush failed", - RemoteExceptionHandler.checkIOException(iex)); + + } catch (Exception ex) { + LOG.error("Compaction failed for region " + + e.getRegion().getRegionName(), ex); if (!checkFileSystem()) { break; } } } + LOG.info(getName() + " exiting"); + } + + /** + * @param e QueueEntry for region to be compacted + */ + public void compactionRequested(QueueEntry e) { + compactionQueue.add(e); + } + } + + // Cache flushing + final Flusher cacheFlusher; + // Needed during shutdown so we send an interrupt after completion of a + // flush, not in the midst. + final Integer cacheFlusherLock = new Integer(0); + + /** Flush cache upon request */ + class Flusher extends Thread implements CacheFlushListener { + private final DelayQueue<QueueEntry> flushQueue = + new DelayQueue<QueueEntry>(); + + private final long optionalFlushPeriod; + + /** constructor */ + public Flusher() { + super(); + this.optionalFlushPeriod = conf.getLong( + "hbase.regionserver.optionalcacheflushinterval", 60L * 1000L); + + } + + /** [EMAIL PROTECTED] */ + @Override + public void run() { + while (!stopRequested.get()) { + QueueEntry e = null; + try { + e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException ex) { + continue; + + } catch (ConcurrentModificationException ex) { + continue; + + } + synchronized(cacheFlusherLock) { // Don't interrupt while we're working + if (e != null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("flushing region " + e.getRegion().getRegionName()); + } + if (e.getRegion().flushcache()) { + compactor.compactionRequested(e); + } + + } catch (DroppedSnapshotException ex) { + // Cache flush can fail in a few places. If it fails in a critical + // section, we get a DroppedSnapshotException and a replay of hlog + // is required. Currently the only way to do this is a restart of + // the server. + LOG.fatal("Replay of hlog required. Forcing server restart", ex); + if (!checkFileSystem()) { + break; + } + HRegionServer.this.stop(); + + } catch (IOException ex) { + LOG.error("Cache flush failed for region " + + e.getRegion().getRegionName(), + RemoteExceptionHandler.checkIOException(ex)); + if (!checkFileSystem()) { + break; + } + + } catch (Exception ex) { + LOG.error("Cache flush failed for region " + + e.getRegion().getRegionName(), ex); + if (!checkFileSystem()) { + break; + } + } + e.setExpirationTime(System.currentTimeMillis() + + optionalFlushPeriod); + flushQueue.add(e); + } + + // Now insure that all the active regions are in the queue + + Set<HRegion> regions = getRegionsToCheck(); + for (HRegion r: regions) { + e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); + synchronized (flushQueue) { + if (!flushQueue.contains(e)) { + flushQueue.add(e); + } + } + } + + // Now make sure that the queue only contains active regions + + synchronized (flushQueue) { + for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) { + e = i.next(); + if (!regions.contains(e.getRegion())) { + i.remove(); + } + } + } + } + } + flushQueue.clear(); + LOG.info(getName() + " exiting"); + } + + /** [EMAIL PROTECTED] */ + public void flushRequested(HRegion region) { + QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); + synchronized (flushQueue) { + if (flushQueue.contains(e)) { + flushQueue.remove(e); + } + flushQueue.add(e); + } } } // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected HLog log; - private final Thread logRollerThread; - protected final Integer logRollerLock = new Integer(0); + final LogRoller logRoller; + final Integer logRollerLock = new Integer(0); /** Runs periodically to determine if the HLog should be rolled */ - class LogRoller extends Chore { - private int MAXLOGENTRIES = - conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); + class LogRoller extends Thread implements LogRollListener { + private volatile boolean rollLog; - /** - * @param period - * @param stop - */ - public LogRoller(final int period, final AtomicBoolean stop) { - super(period, stop); + /** constructor */ + public LogRoller() { + super(); + this.rollLog = false; } /** [EMAIL PROTECTED] */ @Override - protected void chore() { - synchronized(logRollerLock) { - checkForLogRoll(); - } - } - - private void checkForLogRoll() { - // If the number of log entries is high enough, roll the log. This - // is a very fast operation, but should not be done too frequently. - int nEntries = log.getNumEntries(); - if(nEntries > this.MAXLOGENTRIES) { + public synchronized void run() { + while (!stopRequested.get()) { try { - LOG.info("Rolling hlog. Number of entries: " + nEntries); - log.rollWriter(); - } catch (IOException iex) { - LOG.error("Log rolling failed", - RemoteExceptionHandler.checkIOException(iex)); - checkFileSystem(); + this.wait(threadWakeFrequency); + + } catch (InterruptedException e) { + continue; + } + if (!rollLog) { + continue; + } + synchronized (logRollerLock) { + try { + LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries()); + log.rollWriter(); + + } catch (IOException ex) { + LOG.error("Log rolling failed", + RemoteExceptionHandler.checkIOException(ex)); + checkFileSystem(); + + } catch (Exception ex) { + LOG.error("Log rolling failed", ex); + checkFileSystem(); + + } finally { + rollLog = false; + } } } } + + /** [EMAIL PROTECTED] */ + public synchronized void logRollRequested() { + rollLog = true; + this.notifyAll(); + } } /** @@ -396,20 +607,22 @@ this.serverLeaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000); - // Cache flushing chore thread. - this.cacheFlusherThread = - new Flusher(this.threadWakeFrequency, stopRequested); + // Cache flushing thread. + this.cacheFlusher = new Flusher(); - // Check regions to see if they need to be split or compacted chore thread - this.splitOrCompactCheckerThread = - new SplitOrCompactChecker(this.stopRequested); + // Compaction thread + this.compactor = new Compactor(); + // Region split thread + this.splitter = new Splitter(); + + // Log rolling thread + this.logRoller = new LogRoller(); + // Task thread to process requests from Master this.worker = new Worker(); this.workerThread = new Thread(worker); this.sleeper = new Sleeper(this.msgInterval, this.stopRequested); - this.logRollerThread = - new LogRoller(this.threadWakeFrequency, stopRequested); // Server to handle client requests this.server = RPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), @@ -557,14 +770,17 @@ // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - synchronized(logRollerLock) { - this.logRollerThread.interrupt(); - } synchronized(cacheFlusherLock) { - this.cacheFlusherThread.interrupt(); + this.cacheFlusher.interrupt(); + } + synchronized (compactionLock) { + this.compactor.interrupt(); } - synchronized(splitOrCompactLock) { - this.splitOrCompactCheckerThread.interrupt(); + synchronized (splitterLock) { + this.splitter.interrupt(); + } + synchronized (logRollerLock) { + this.logRoller.interrupt(); } if (abortRequested) { @@ -657,7 +873,7 @@ "running at " + this.serverInfo.getServerAddress().toString() + " because logdir " + logdir.toString() + " exists"); } - return new HLog(fs, logdir, conf); + return new HLog(fs, logdir, conf, logRoller); } /* @@ -680,16 +896,13 @@ LOG.fatal("Set stop flag in " + t.getName(), e); } }; - Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher", - handler); - Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread, - n + ".splitOrCompactChecker", handler); - Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller", + Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", + handler); + Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); - // Worker is not the same as the above threads in that it does not - // inherit from Chore. Set an UncaughtExceptionHandler on it in case its - // the one to see an OOME, etc., first. The handler will set the stop - // flag. + Threads.setDaemonThreadRunning(this.compactor, n + ".compactor", + handler); + Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -752,9 +965,10 @@ */ void join() { join(this.workerThread); - join(this.logRollerThread); - join(this.cacheFlusherThread); - join(this.splitOrCompactCheckerThread); + join(this.logRoller); + join(this.cacheFlusher); + join(this.compactor); + join(this.splitter); } private void join(final Thread t) { @@ -925,7 +1139,8 @@ HRegion region = onlineRegions.get(regionInfo.getRegionName()); if(region == null) { region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)), - this.log, FileSystem.get(conf), conf, regionInfo, null); + this.log, FileSystem.get(conf), conf, regionInfo, null, + this.cacheFlusher); this.lock.writeLock().lock(); try { this.log.setSequenceNumber(region.getMinSequenceId()); @@ -1226,6 +1441,11 @@ public AtomicInteger getRequestCount() { return this.requestCount; } + + /** @return reference to CacheFlushListener */ + public CacheFlushListener getCacheFlushListener() { + return this.cacheFlusher; + } /** * Protected utility method for safely obtaining an HRegion handle. @@ -1318,8 +1538,8 @@ * @return Returns list of non-closed regions hosted on this server. If no * regions to check, returns an empty list. */ - protected List<HRegion> getRegionsToCheck() { - ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>(); + protected Set<HRegion> getRegionsToCheck() { + HashSet<HRegion> regionsToCheck = new HashSet<HRegion>(); //TODO: is this locking necessary? lock.readLock().lock(); try { @@ -1328,8 +1548,7 @@ lock.readLock().unlock(); } // Purge closed regions. - for (final ListIterator<HRegion> i = regionsToCheck.listIterator(); - i.hasNext();) { + for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) { HRegion r = i.next(); if (r.isClosed()) { i.remove();
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java?rev=597959&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java Sat Nov 24 23:17:38 2007 @@ -0,0 +1,29 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Mechanism by which the HLog requests a log roll + */ +public interface LogRollListener { + /** Request that the log be rolled */ + public void logRollRequested(); +} Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java Sat Nov 24 23:17:38 2007 @@ -31,6 +31,10 @@ private final int period; private AtomicBoolean stop; + /** + * @param sleep + * @param stop + */ public Sleeper(final int sleep, final AtomicBoolean stop) { this.period = sleep; this.stop = stop; @@ -40,7 +44,7 @@ * Sleep for period. */ public void sleep() { - sleep(System.currentTimeMillis()); + sleep(period); } /** Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml Sat Nov 24 23:17:38 2007 @@ -104,7 +104,16 @@ </description> </property> <property> + <name>hbase.regionserver.optionalcacheflushinterval</name> + <value>10000</value> + <description> + Amount of time to wait since the last time a region was flushed before + invoking an optional cache flush. Default 60,000. + </description> + </property> + <property> <name>hbase.rootdir</name> <value>/hbase</value> - <description>location of HBase instance in dfs</description></property> + <description>location of HBase instance in dfs</description> + </property> </configuration> Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Sat Nov 24 23:17:38 2007 @@ -123,8 +123,8 @@ FileSystem fs = dir.getFileSystem(c); fs.mkdirs(regionDir); return new HRegion(dir, - new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), - fs, conf, info, null); + new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf, + null), fs, conf, info, null, null); } protected HTableDescriptor createTableDescriptor(final String name) { @@ -365,7 +365,7 @@ return region.getFull(row); } public void flushcache() throws IOException { - this.region.internalFlushcache(this.region.snapshotMemcaches()); + this.region.flushcache(); } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Sat Nov 24 23:17:38 2007 @@ -257,7 +257,7 @@ for (LocalHBaseCluster.RegionServerThread t: this.hbaseCluster.getRegionServers()) { for(HRegion r: t.getRegionServer().onlineRegions.values() ) { - r.internalFlushcache(r.snapshotMemcaches()); + r.flushcache(); } } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Sat Nov 24 23:17:38 2007 @@ -103,9 +103,11 @@ } } - // Flush will provoke a split next time the split-checker thread runs. - r.internalFlushcache(r.snapshotMemcaches()); + // Flush the cache + cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener(). + flushRequested(r); + // Now, wait until split makes it into the meta table. int oldCount = count; for (int i = 0; i < retries; i++) { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java Sat Nov 24 23:17:38 2007 @@ -54,10 +54,11 @@ @Override public void setUp() throws Exception { super.setUp(); - this.hlog = new HLog(this.localFs, this.testDir, this.conf); + this.hlog = new HLog(this.localFs, this.testDir, this.conf, null); HTableDescriptor htd = createTableDescriptor(getName()); HRegionInfo hri = new HRegionInfo(htd, null, null); - this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + this.r = + new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null); } /** [EMAIL PROTECTED] */ Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java Sat Nov 24 23:17:38 2007 @@ -93,9 +93,9 @@ HRegionInfo.encodeRegionName(info.getRegionName())); fs.mkdirs(regionDir); - HLog log = new HLog(fs, new Path(regionDir, "log"), conf); + HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null); - HRegion region = new HRegion(dir, log, fs, conf, info, null); + HRegion region = new HRegion(dir, log, fs, conf, info, null, null); HRegionIncommon r = new HRegionIncommon(region); // Write information to the table @@ -135,7 +135,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, info, null); + region = new HRegion(dir, log, fs, conf, info, null, null); r = new HRegionIncommon(region); // Read it back @@ -164,7 +164,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, info, null); + region = new HRegion(dir, log, fs, conf, info, null, null); r = new HRegionIncommon(region); // Read it back Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java Sat Nov 24 23:17:38 2007 @@ -45,6 +45,10 @@ this.table = null; Logger.getRootLogger().setLevel(Level.INFO); + // Make the thread wake frequency a little slower so other threads + // can run + conf.setInt("hbase.server.thread.wakefrequency", 2000); + // Make lease timeout longer, lease checks less frequent conf.setInt("hbase.master.lease.period", 10 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); @@ -112,7 +116,7 @@ Text rowlabel = new Text("row_" + k); byte bodydata[] = table.get(rowlabel, CONTENTS_BASIC); - assertNotNull(bodydata); + assertNotNull("no data for row " + rowlabel, bodydata); String bodystr = new String(bodydata, HConstants.UTF8_ENCODING).trim(); String teststr = CONTENTSTR + k; assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java Sat Nov 24 23:17:38 2007 @@ -45,7 +45,7 @@ final Text tableName = new Text("tablename"); final Text row = new Text("row"); Reader reader = null; - HLog log = new HLog(fs, dir, this.conf); + HLog log = new HLog(fs, dir, this.conf, null); try { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java Sat Nov 24 23:17:38 2007 @@ -98,12 +98,12 @@ fs.mkdirs(parentdir); newlogdir = new Path(parentdir, "log"); - log = new HLog(fs, newlogdir, conf); + log = new HLog(fs, newlogdir, conf, null); desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); r = new HRegion(parentdir, log, fs, conf, - new HRegionInfo(desc, null, null), null); + new HRegionInfo(desc, null, null), null, null); region = new HRegionIncommon(r); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Sat Nov 24 23:17:38 2007 @@ -19,10 +19,12 @@ */ package org.apache.hadoop.hbase; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dfs.MiniDFSCluster; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; /** @@ -32,7 +34,8 @@ private static final Log LOG = LogFactory.getLog(TestLogRolling.class); private MiniDFSCluster dfs; private MiniHBaseCluster cluster; - private Path logdir; + private HRegionServer server; + private HLog log; private String tableName; private byte[] value; @@ -45,10 +48,14 @@ try { this.dfs = null; this.cluster = null; - this.logdir = null; + this.server = null; + this.log = null; this.tableName = null; this.value = null; + // Force a region split after every 768KB + conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); + // We roll the log after every 256 writes conf.setInt("hbase.regionserver.maxlogentries", 256); @@ -118,8 +125,8 @@ // continue } - this.logdir = - cluster.getRegionThreads().get(0).getRegionServer().getLog().dir; + this.server = cluster.getRegionThreads().get(0).getRegionServer(); + this.log = server.getLog(); // When the META table can be opened, the region servers are running @SuppressWarnings("unused") @@ -150,21 +157,6 @@ } } - private int countLogFiles(final boolean print) throws Exception { - Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir}); - if (print) { - for (int i = 0; i < logfiles.length; i++) { - if (LOG.isDebugEnabled()) { - LOG.debug("logfile: " + logfiles[i].toString()); - } - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("number of log files: " + logfiles.length); - } - return logfiles.length; - } - /** * Tests that logs are deleted * @@ -172,21 +164,24 @@ */ public void testLogRolling() throws Exception { tableName = getName(); - // Force a region split after every 768KB - conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); try { startAndWriteData(); - int count = countLogFiles(true); - LOG.info("Finished writing. There are " + count + " log files. " + - "Sleeping to let cache flusher and log roller run"); - while (count > 2) { - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted", e); - } - count = countLogFiles(true); + LOG.info("after writing there are " + log.getNumLogFiles() + " log files"); + + // flush all regions + + List<HRegion> regions = + new ArrayList<HRegion>(server.getOnlineRegions().values()); + for (HRegion r: regions) { + r.flushcache(); } + + // Now roll the log + log.rollWriter(); + + int count = log.getNumLogFiles(); + LOG.info("after flushing all regions and rolling logs there are " + + log.getNumLogFiles() + " log files"); assertTrue(count <= 2); } catch (Exception e) { LOG.fatal("unexpected exception", e); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java Sat Nov 24 23:17:38 2007 @@ -144,9 +144,9 @@ HRegionInfo.encodeRegionName(REGION_INFO.getRegionName())); fs.mkdirs(regionDir); - HLog log = new HLog(fs, new Path(regionDir, "log"), conf); + HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Write information to the meta table @@ -169,7 +169,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Verify we can get the data back now that it is on disk. @@ -210,7 +210,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Validate again @@ -247,7 +247,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Validate again Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Sat Nov 24 23:17:38 2007 @@ -65,11 +65,11 @@ */ public void testBasicSplit() throws Exception { HRegion region = null; - HLog hlog = new HLog(this.localFs, this.testDir, this.conf); + HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null); try { HTableDescriptor htd = createTableDescriptor(getName()); HRegionInfo hri = new HRegionInfo(htd, null, null); - region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null); basicSplit(region); } finally { if (region != null) { @@ -81,7 +81,7 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); - region.internalFlushcache(region.snapshotMemcaches()); + region.flushcache(); Text midkey = new Text(); assertTrue(region.needsSplit(midkey)); HRegion [] regions = split(region); @@ -108,12 +108,7 @@ } addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME1); - long startTime = region.snapshotMemcaches(); - if (startTime == -1) { - LOG.info("cache flush not needed"); - } else { - regions[i].internalFlushcache(startTime); - } + regions[i].flushcache(); } // Assert that even if one store file is larger than a reference, the Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java Sat Nov 24 23:17:38 2007 @@ -310,11 +310,11 @@ } private HRegion createRegion() throws IOException { - HLog hlog = new HLog(this.localFs, this.testDir, this.conf); + HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null); HTableDescriptor htd = createTableDescriptor(getName()); htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS, CompressionType.NONE, false, Integer.MAX_VALUE, null)); HRegionInfo hri = new HRegionInfo(htd, null, null); - return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null); } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=597959&r1=597958&r2=597959&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Sat Nov 24 23:17:38 2007 @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.Map; import java.util.TreeMap; @@ -87,6 +88,9 @@ public TestTableMapReduce() { super(); + // The region server doesn't have to talk to the master quite so often + conf.setInt("hbase.regionserver.msginterval", 2000); + // Make the thread wake frequency a little slower so other threads // can run conf.setInt("hbase.server.thread.wakefrequency", 2000); @@ -105,6 +109,9 @@ // Make lease timeout longer, lease checks less frequent conf.setInt("hbase.master.lease.period", 10 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + + // Set client pause to the original default + conf.setInt("hbase.client.pause", 10 * 1000); } /** @@ -381,9 +388,11 @@ assertNotNull(firstValue); assertNotNull(secondValue); assertEquals(firstValue.length, secondValue.length); - for (int i=0; i<firstValue.length; i++) { - assertEquals(firstValue[i], secondValue[firstValue.length-i-1]); + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; } + assertTrue(Arrays.equals(firstValue, secondReversed)); } } finally {