Author: stack Date: Thu Oct 11 07:45:36 2007 New Revision: 583839 URL: http://svn.apache.org/viewvc?rev=583839&view=rev Log: HADOOP-2029 TestLogRolling fails too often in patch and nightlies
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=583839&r1=583838&r2=583839&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Oct 11 07:45:36 2007 @@ -74,6 +74,7 @@ daemon scripts HADOOP-2017 TestRegionServerAbort failure in patch build #903 and nightly #266 + HADOOP-2029 TestLogRolling fails too often in patch and nightlies IMPROVEMENTS HADOOP-1737 Make HColumnDescriptor data publically members settable Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=583839&r1=583838&r2=583839&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Thu Oct 11 07:45:36 2007 @@ -83,26 +83,28 @@ */ public class HLog implements HConstants { private static final Log LOG = LogFactory.getLog(HLog.class); - - static final String HLOG_DATFILE = "hlog.dat."; - + private static final String HLOG_DATFILE = "hlog.dat."; static final Text METACOLUMN = new Text("METACOLUMN:"); - static final Text METAROW = new Text("METAROW"); - - FileSystem fs; - - Path dir; - - Configuration conf; - + final FileSystem fs; + final Path dir; + final Configuration conf; final long threadWakeFrequency; + /* + * Current log file. + */ SequenceFile.Writer writer; - TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>(); + /* + * Map of all log files but the current one. + */ + final TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>(); - HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>(); + /* + * Map of region to last sequence/edit id. + */ + final Map<Text, Long> lastSeqWritten = new HashMap<Text, Long>(); volatile boolean closed = false; @@ -129,11 +131,12 @@ * @throws IOException */ static void splitLog(Path rootDir, Path srcDir, FileSystem fs, - Configuration conf) throws IOException { + Configuration conf) + throws IOException { Path logfiles[] = fs.listPaths(new Path[] { srcDir }); LOG.info("splitting " + logfiles.length + " log(s) in " + srcDir.toString()); - HashMap<Text, SequenceFile.Writer> logWriters = + Map<Text, SequenceFile.Writer> logWriters = new HashMap<Text, SequenceFile.Writer>(); try { for (int i = 0; i < logfiles.length; i++) { @@ -156,12 +159,12 @@ SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path(HRegion.getRegionDir(rootDir, - regionName), HREGION_OLDLOGFILE_NAME); + regionName), HREGION_OLDLOGFILE_NAME); if (LOG.isDebugEnabled()) { LOG.debug("getting new log file writer for path " + logfile); } w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, - HLogEdit.class); + HLogEdit.class); logWriters.put(regionName, w); } if (LOG.isDebugEnabled()) { @@ -202,12 +205,12 @@ * @param conf * @throws IOException */ - HLog(FileSystem fs, Path dir, Configuration conf) throws IOException { + HLog(final FileSystem fs, final Path dir, final Configuration conf) + throws IOException { this.fs = fs; this.dir = dir; this.conf = conf; this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); - if (fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -242,7 +245,7 @@ * flush cannot start when the log is being rolled and the log cannot be * rolled during a cache flush. * - * Note that this method cannot be synchronized because it is possible that + * <p>Note that this method cannot be synchronized because it is possible that * startCacheFlush runs, obtaining the cacheFlushLock, then this method could * start which would obtain the lock on this but block on obtaining the * cacheFlushLock and then completeCacheFlush could be called which would wait @@ -253,81 +256,94 @@ synchronized void rollWriter() throws IOException { boolean locked = false; while (!locked && !closed) { - if (cacheFlushLock.tryLock()) { + if (this.cacheFlushLock.tryLock()) { locked = true; break; } try { this.wait(threadWakeFrequency); } catch (InterruptedException e) { + // continue } } if (closed) { if (locked) { - cacheFlushLock.unlock(); + this.cacheFlushLock.unlock(); } throw new IOException("Cannot roll log; log is closed"); } // If we get here we have locked out both cache flushes and appends - try { - if (writer != null) { + if (this.writer != null) { // Close the current writer, get a new one. - writer.close(); + this.writer.close(); Path p = computeFilename(filenum - 1); if (LOG.isDebugEnabled()) { LOG.debug("Closing current log writer " + p.toString() + - " to get a new one"); + " to get a new one"); } if (filenum > 0) { - synchronized (sequenceLock) { - outputfiles.put(logSeqNum - 1, p); + synchronized (this.sequenceLock) { + this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p); } } } Path newPath = computeFilename(filenum++); - this.writer = SequenceFile.createWriter(fs, conf, newPath, + this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath, HLogKey.class, HLogEdit.class); - LOG.info("new log writer created at " + newPath); // Can we delete any of the old log files? - - TreeSet<Long> sequenceNumbers = - new TreeSet<Long>(lastSeqWritten.values()); - - if (sequenceNumbers.size() > 0) { - long oldestOutstandingSeqNum = sequenceNumbers.first(); - - // Get the set of all log files whose final ID is older than the oldest - // pending region operation - - sequenceNumbers.clear(); - sequenceNumbers.addAll(outputfiles.headMap( - oldestOutstandingSeqNum).keySet()); - - // Now remove old log files (if any) - - for (Long seq : sequenceNumbers) { - Path p = outputfiles.remove(seq); - LOG.info("removing old log file " + p.toString()); - fs.delete(p); + if (this.outputfiles.size() > 0) { + if (this.lastSeqWritten.size() <= 0) { + LOG.debug("Last sequence written is empty. Deleting all old hlogs"); + // If so, then no new writes have come in since all regions were + // flushed (and removed from the lastSeqWritten map). Means can + // remove all but currently open log file. + for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) { + deleteLogFile(e.getValue(), e.getKey()); + } + this.outputfiles.clear(); + } else { + // Get oldest edit/sequence id. If logs are older than this id, + // then safe to remove. + TreeSet<Long> sequenceNumbers = + new TreeSet<Long>(this.lastSeqWritten.values()); + long oldestOutstandingSeqNum = sequenceNumbers.first().longValue(); + // Get the set of all log files whose final ID is older than the + // oldest pending region operation + sequenceNumbers.clear(); + sequenceNumbers.addAll(this.outputfiles.headMap( + Long.valueOf(oldestOutstandingSeqNum)).keySet()); + // Now remove old log files (if any) + LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + + "using oldest outstanding seqnum of " + oldestOutstandingSeqNum); + for (Long seq : sequenceNumbers) { + deleteLogFile(this.outputfiles.remove(seq), seq); + } } } this.numEntries = 0; - } finally { - cacheFlushLock.unlock(); + this.cacheFlushLock.unlock(); } } + + private void deleteLogFile(final Path p, final Long seqno) + throws IOException { + LOG.info("removing old log file " + p.toString() + + " whose highest sequence/edit id is " + seqno); + this.fs.delete(p); + } /** * This is a convenience method that computes a new filename with a given * file-number. */ Path computeFilename(final long fn) { - return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn)); + return new Path(dir, + HLOG_DATFILE + String.format("%1$03d", Long.valueOf(fn))); } /** @@ -378,27 +394,26 @@ * @throws IOException */ synchronized void append(Text regionName, Text tableName, Text row, - TreeMap<Text, byte[]> columns, long timestamp) throws IOException { + TreeMap<Text, byte[]> columns, long timestamp) + throws IOException { if (closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum[] = obtainSeqNum(columns.size()); - - // The 'lastSeqWritten' map holds the sequence number of the most recent + // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region. When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten - - lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]); - + // is greater than or equal to the value in lastSeqWritten. + if (!this.lastSeqWritten.containsKey(regionName)) { + this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + } int counter = 0; for (Map.Entry<Text, byte[]> es : columns.entrySet()) { HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum[counter++]); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); - writer.append(logKey, logEdit); - numEntries++; + this.writer.append(logKey, logEdit); + this.numEntries++; } } @@ -426,9 +441,9 @@ */ private long[] obtainSeqNum(int num) { long[] results = new long[num]; - synchronized (sequenceLock) { + synchronized (this.sequenceLock) { for (int i = 0; i < num; i++) { - results[i] = logSeqNum++; + results[i] = this.logSeqNum++; } } return results; @@ -447,7 +462,7 @@ * @see #abortCacheFlush() */ long startCacheFlush() { - cacheFlushLock.lock(); + this.cacheFlushLock.lock(); return obtainSeqNum(); } @@ -462,25 +477,22 @@ * @throws IOException */ synchronized void completeCacheFlush(final Text regionName, - final Text tableName, final long logSeqId) throws IOException { - + final Text tableName, final long logSeqId) + throws IOException { try { if (this.closed) { return; } - - writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), - new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), - System.currentTimeMillis())); - - numEntries++; - Long seq = lastSeqWritten.get(regionName); - if (seq != null && logSeqId >= seq) { - lastSeqWritten.remove(regionName); + this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), + System.currentTimeMillis())); + this.numEntries++; + Long seq = this.lastSeqWritten.get(regionName); + if (seq != null && logSeqId >= seq.longValue()) { + this.lastSeqWritten.remove(regionName); } - } finally { - cacheFlushLock.unlock(); + this.cacheFlushLock.unlock(); notifyAll(); // wake up the log roller if it is waiting } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=583839&r1=583838&r2=583839&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Thu Oct 11 07:45:36 2007 @@ -19,8 +19,6 @@ */ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dfs.MiniDFSCluster; @@ -128,9 +126,10 @@ try { Thread.sleep(10 * 1000); // Wait for region server to start } catch (InterruptedException e) { + // continue } - logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; + this.logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; // When the META table can be opened, the region servers are running @SuppressWarnings("unused") @@ -155,13 +154,14 @@ try { Thread.sleep(2000); } catch (InterruptedException e) { + // continue } } } } - private int countLogFiles(boolean print) throws IOException { - Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir}); + private int countLogFiles(final boolean print) throws Exception { + Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir}); if (print) { for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { @@ -186,15 +186,18 @@ conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); try { startAndWriteData(); - LOG.info("Finished writing. Sleeping to let cache flusher and log roller run"); - try { - // Wait for log roller and cache flusher to run a few times... - Thread.sleep(30L * 1000L); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted", e); + int count = countLogFiles(true); + LOG.info("Finished writing. There are " + count + " log files. " + + "Sleeping to let cache flusher and log roller run"); + while (count > 2) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + count = countLogFiles(true); } - LOG.info("Wake from sleep"); - assertTrue(countLogFiles(true) <= 2); + assertTrue(count <= 2); } catch (Exception e) { LOG.fatal("unexpected exception", e); throw e;