Author: jimk Date: Tue Sep 25 12:13:50 2007 New Revision: 579353 URL: http://svn.apache.org/viewvc?rev=579353&view=rev Log: HADOOP-1943 LogRolling test fails: reverting changes for HADOOP-1820
Removed: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=579353&r1=579352&r2=579353&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue Sep 25 12:13:50 2007 @@ -45,6 +45,7 @@ HADOOP-1813 OOME makes zombie of region server HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson HADOOP-1820 Regionserver creates hlogs without bound + (reverted 2007/09/25) HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8") HADOOP-1832 listTables() returns duplicate tables HADOOP-1834 Scanners ignore timestamp passed on creation Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=579353&r1=579352&r2=579353&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Tue Sep 25 12:13:50 2007 @@ -29,9 +29,6 @@ import java.io.*; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * HLog stores all the edits to the HStore. @@ -56,11 +53,6 @@ * older (smaller) than the most-recent CACHEFLUSH message for every HRegion * that has a message in F. * - * <p>synchronized methods can never execute in parallel. However, between the - * start of a cache flush and the completion point, appends are allowed but log - * rolling is not. To prevent log rolling taking place during this period, a - * separate reentrant lock is used. - * * <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs * in HDFS is currently flawed. HBase writes edits to logs and to a memcache. * The 'atomic' write to the log is meant to serve as insurance against @@ -82,21 +74,20 @@ SequenceFile.Writer writer; TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>(); - HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>(); + volatile boolean insideCacheFlush = false; + + TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>(); volatile boolean closed = false; - AtomicLong logSeqNum = new AtomicLong(0); - volatile long filenum = 0; + volatile long logSeqNum = 0; + long filenum = 0; AtomicInteger numEntries = new AtomicInteger(0); - // This lock prevents starting a log roll during a cache flush. - // synchronized is insufficient because a cache flush spans two method calls. - private final Lock cacheFlushLock = new ReentrantLock(); + Integer rollLock = new Integer(0); /** * Split up a bunch of log files, that are no longer being written to, - * into new files, one per region. Delete the old log files when finished. - * + * into new files, one per region. Delete the old log files when ready. * @param rootDir Root directory of the HBase instance * @param srcDir Directory of log files to split: * e.g. <code>${ROOTDIR}/log_HOST_PORT</code> @@ -189,105 +180,109 @@ fs.mkdirs(dir); rollWriter(); } - - /** - * Called by HRegionServer when it opens a new region to ensure that log - * sequence numbers are always greater than the latest sequence number of - * the region being brought on-line. - * - * @param newvalue - */ + synchronized void setSequenceNumber(long newvalue) { - if (newvalue > logSeqNum.get()) { + if (newvalue > logSeqNum) { if (LOG.isDebugEnabled()) { LOG.debug("changing sequence number from " + logSeqNum + " to " + newvalue); } - logSeqNum.set(newvalue); + logSeqNum = newvalue; } } /** * Roll the log writer. That is, start writing log messages to a new file. - * - * Because a log cannot be rolled during a cache flush, and a cache flush - * spans two method calls, a special lock needs to be obtained so that a - * cache flush cannot start when the log is being rolled and the log cannot - * be rolled during a cache flush. - * - * Note that this method cannot be synchronized because it is possible that - * startCacheFlush runs, obtaining the cacheFlushLock, then this method could - * start which would obtain the lock on this but block on obtaining the - * cacheFlushLock and then completeCacheFlush could be called which would - * wait for the lock on this and consequently never release the cacheFlushLock + * + * The 'rollLock' prevents us from entering rollWriter() more than + * once at a time. + * + * The 'this' lock limits access to the current writer so + * we don't append multiple items simultaneously. * * @throws IOException */ void rollWriter() throws IOException { - if(closed) { - throw new IOException("Cannot roll log; log is closed"); - } + synchronized(rollLock) { - cacheFlushLock.lock(); // prevent cache flushes - try { - // Now that we have locked out cache flushes, lock this to prevent other - // changes. + // Try to roll the writer to a new file. We may have to + // wait for a cache-flush to complete. In the process, + // compute a list of old log files that can be deleted. + + Vector<Path> toDeleteList = new Vector<Path>(); + synchronized(this) { + if(closed) { + throw new IOException("Cannot roll log; log is closed"); + } - synchronized (this) { - if (writer != null) { // Close the current writer (if any), get a new one. + // Make sure we do not roll the log while inside a + // cache-flush. Otherwise, the log sequence number for + // the CACHEFLUSH operation will appear in a "newer" log file + // than it should. + while(insideCacheFlush) { + try { + wait(); + } catch (InterruptedException ie) { + // continue; + } + } + + // Close the current writer (if any), and grab a new one. + if(writer != null) { writer.close(); Path p = computeFilename(filenum - 1); if(LOG.isDebugEnabled()) { LOG.debug("Closing current log writer " + p.toString() + - " to get a new one"); + " to get a new one"); } if (filenum > 0) { - outputfiles.put(logSeqNum.get() - 1, p); + outputfiles.put(logSeqNum - 1, p); } } Path newPath = computeFilename(filenum++); - this.writer = SequenceFile.createWriter(fs, conf, newPath, HLogKey.class, - HLogEdit.class); - - if (LOG.isDebugEnabled()) { + this.writer = SequenceFile.createWriter(fs, conf, newPath, + HLogKey.class, HLogEdit.class); + if(LOG.isDebugEnabled()) { LOG.debug("new log writer created at " + newPath); } - + // Can we delete any of the old log files? // First, compute the oldest relevant log operation // over all the regions. long oldestOutstandingSeqNum = Long.MAX_VALUE; - for (Long l: lastSeqWritten.values()) { + for(Long l: regionToLastFlush.values()) { long curSeqNum = l.longValue(); - - if (curSeqNum < oldestOutstandingSeqNum) { + + if(curSeqNum < oldestOutstandingSeqNum) { oldestOutstandingSeqNum = curSeqNum; } } - // Get the set of all sequence numbers that are older than the oldest - // pending region operation - - TreeSet<Long> sequenceNumbers = new TreeSet<Long>(); - sequenceNumbers.addAll( - outputfiles.headMap(oldestOutstandingSeqNum).keySet()); - - // Remove all files with a final ID that's older than the oldest - // pending region-operation. - - for (Long seq: sequenceNumbers) { - Path p = outputfiles.remove(seq); - if(LOG.isDebugEnabled()) { - LOG.debug("removing old log file " + p.toString()); + // Next, remove all files with a final ID that's older + // than the oldest pending region-operation. + for(Iterator<Long> it = outputfiles.keySet().iterator(); it.hasNext();) { + long maxSeqNum = it.next().longValue(); + if(maxSeqNum < oldestOutstandingSeqNum) { + Path p = outputfiles.get(maxSeqNum); + it.remove(); + toDeleteList.add(p); + + } else { + break; } - fs.delete(p); } - this.numEntries.set(0); } - - } finally { - cacheFlushLock.unlock(); + + // Actually delete them, if any! + for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) { + Path p = it.next(); + if(LOG.isDebugEnabled()) { + LOG.debug("removing old log file " + p.toString()); + } + fs.delete(p); + } + this.numEntries.set(0); } } @@ -333,9 +328,7 @@ * other systems should process the log appropriately upon each startup * (and prior to initializing HLog). * - * synchronized prevents appends during the completion of a cache flush or - * for the duration of a log roll. - * + * We need to seize a lock on the writer so that writes are atomic. * @param regionName * @param tableName * @param row @@ -344,19 +337,21 @@ * @throws IOException */ synchronized void append(Text regionName, Text tableName, Text row, - TreeMap<Text, byte []> columns, long timestamp) throws IOException { + TreeMap<Text, byte []> columns, long timestamp) + throws IOException { if(closed) { throw new IOException("Cannot append; log is closed"); } - - long seqNum[] = obtainSeqNum(columns.size()); - // The 'lastSeqWritten' map holds the sequence number of the most recent - // write for each region. When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten - - lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]); + long seqNum[] = obtainSeqNum(columns.size()); + + // The 'regionToLastFlush' map holds the sequence id of the + // most recent flush for every regionName. However, for regions + // that don't have any flush yet, the relevant operation is the + // first one that's been added. + if (regionToLastFlush.get(regionName) == null) { + regionToLastFlush.put(regionName, seqNum[0]); + } int counter = 0; for (Map.Entry<Text, byte []> es: columns.entrySet()) { @@ -368,39 +363,29 @@ } } - /** - * @return How many items have been added to the log - * - * Because numEntries is an AtomicInteger, no locking is required. - */ + /** @return How many items have been added to the log */ int getNumEntries() { return numEntries.get(); } /** - * Obtain a log sequence number. - * - * Because it is only called from a synchronized method, no additional locking - * is required. + * Obtain a log sequence number. This seizes the whole HLog + * lock, but it shouldn't last too long. */ - private long obtainSeqNum() { - return logSeqNum.getAndIncrement(); + synchronized long obtainSeqNum() { + return logSeqNum++; } /** * Obtain a specified number of sequence numbers * - * Because it is only called from a synchronized method, no additional locking - * is required. - * * @param num - number of sequence numbers to obtain * @return - array of sequence numbers */ - private long[] obtainSeqNum(int num) { - long sequenceNumber = logSeqNum.getAndAdd(num); + synchronized long[] obtainSeqNum(int num) { long[] results = new long[num]; for (int i = 0; i < num; i++) { - results[i] = sequenceNumber++; + results[i] = logSeqNum++; } return results; } @@ -409,50 +394,54 @@ * By acquiring a log sequence ID, we can allow log messages * to continue while we flush the cache. * - * Acquire a lock so that we do not roll the log between the start - * and completion of a cache-flush. Otherwise the log-seq-id for + * Set a flag so that we do not roll the log between the start + * and complete of a cache-flush. Otherwise the log-seq-id for * the flush will not appear in the correct logfile. - * * @return sequence ID to pass [EMAIL PROTECTED] #completeCacheFlush(Text, Text, long)} * @see #completeCacheFlush(Text, Text, long) * @see #abortCacheFlush() */ synchronized long startCacheFlush() { - cacheFlushLock.lock(); + while (this.insideCacheFlush) { + try { + wait(); + } catch (InterruptedException ie) { + // continue + } + } + this.insideCacheFlush = true; + notifyAll(); return obtainSeqNum(); } - /** - * Complete the cache flush - * - * Protected by this.lock() - * + /** Complete the cache flush * @param regionName * @param tableName * @param logSeqId * @throws IOException */ synchronized void completeCacheFlush(final Text regionName, - final Text tableName, final long logSeqId) throws IOException { - - try { - if(this.closed) { - return; - } - - writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), - new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), - System.currentTimeMillis())); - - numEntries.getAndIncrement(); - Long seq = lastSeqWritten.get(regionName); - if (seq != null && logSeqId >= seq) { - lastSeqWritten.remove(regionName); - } - - } finally { - cacheFlushLock.unlock(); + final Text tableName, final long logSeqId) + throws IOException { + if(this.closed) { + return; } + + if (!this.insideCacheFlush) { + throw new IOException("Impossible situation: inside " + + "completeCacheFlush(), but 'insideCacheFlush' flag is false"); + } + HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId); + this.writer.append(key, + new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), + System.currentTimeMillis())); + this.numEntries.getAndIncrement(); + + // Remember the most-recent flush for each region. + // This is used to delete obsolete log files. + this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId)); + + cleanup(); } /** @@ -462,8 +451,23 @@ * is a restart of the regionserver so the snapshot content dropped by the * failure gets restored to the memcache. */ - void abortCacheFlush() { - this.cacheFlushLock.unlock(); + synchronized void abortCacheFlush() { + cleanup(); + } + + private synchronized void cleanup() { + this.insideCacheFlush = false; + notifyAll(); + } + + /** + * Abort a cache flush. + * This method will clear waits on [EMAIL PROTECTED] #insideCacheFlush} but if this + * method is called, we are losing data. TODO: Fix. + */ + synchronized void abort() { + this.insideCacheFlush = false; + notifyAll(); } private static void usage() { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=579353&r1=579352&r2=579353&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Tue Sep 25 12:13:50 2007 @@ -210,7 +210,6 @@ final int memcacheFlushSize; final int blockingMemcacheSize; protected final long threadWakeFrequency; - protected final int optionalFlushCount; private final HLocking lock = new HLocking(); private long desiredMaxFileSize; private final long maxSequenceId; @@ -248,8 +247,6 @@ this.regionInfo = regionInfo; this.memcache = new HMemcache(); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); - this.optionalFlushCount = - conf.getInt("hbase.hregion.memcache.optionalflushcount", 10); // Declare the regionName. This is a unique string for the region, used to // build a unique filename. @@ -731,13 +728,11 @@ void optionallyFlush() throws IOException { if(this.memcache.getSize() > this.memcacheFlushSize) { flushcache(false); - } else if (this.memcache.getSize() > 0) { - if (this.noFlushCount >= this.optionalFlushCount) { - LOG.info("Optional flush called " + this.noFlushCount + - " times when data present without flushing. Forcing one."); - flushcache(false); - - } else { + } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) { + LOG.info("Optional flush called " + this.noFlushCount + + " times when data present without flushing. Forcing one."); + flushcache(false); + if (this.memcache.getSize() > 0) { // Only increment if something in the cache. // Gets zero'd when a flushcache is called. this.noFlushCount++; @@ -869,31 +864,25 @@ retval.memcacheSnapshot.size()); } - try { - // A. Flush memcache to all the HStores. - // Keep running vector of all store files that includes both old and the - // just-made new flush store file. - for (HStore hstore: stores.values()) { - hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); - } - } catch (IOException e) { - // An exception here means that the snapshot was not persisted. - // The hlog needs to be replayed so its content is restored to memcache. - // Currently, only a server restart will do this. - this.log.abortCacheFlush(); - throw new DroppedSnapshotException(e.getMessage()); + // A. Flush memcache to all the HStores. + // Keep running vector of all store files that includes both old and the + // just-made new flush store file. + for (HStore hstore: stores.values()) { + hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); } - // If we get to here, the HStores have been written. If we get an - // error in completeCacheFlush it will release the lock it is holding - // B. Write a FLUSHCACHE-COMPLETE message to the log. // This tells future readers that the HStores were emitted correctly, // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. this.log.completeCacheFlush(this.regionInfo.regionName, - regionInfo.tableDesc.getName(), logCacheFlushId); - + regionInfo.tableDesc.getName(), logCacheFlushId); + } catch (IOException e) { + // An exception here means that the snapshot was not persisted. + // The hlog needs to be replayed so its content is restored to memcache. + // Currently, only a server restart will do this. + this.log.abortCacheFlush(); + throw new DroppedSnapshotException(e.getMessage()); } finally { // C. Delete the now-irrelevant memcache snapshot; its contents have been // dumped to disk-based HStores or, if error, clear aborted snapshot.