Author: apurtell Date: Sat Jul 11 22:53:14 2009 New Revision: 793239 URL: http://svn.apache.org/viewvc?rev=793239&view=rev Log: HBASE-1470
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt?rev=793239&r1=793238&r2=793239&view=diff ============================================================================== --- hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt (original) +++ hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt Sat Jul 11 22:53:14 2009 @@ -468,6 +468,7 @@ HBASE-1643 ScanDeleteTracker takes comparator but it unused HBASE-1603 MR failed "RetriesExhaustedException: Trying to contact region server Some server for region TestTable..." -- deubugging + HBASE-1470 hbase and HADOOP-4379, dhruba's flush/sync OPTIMIZATIONS HBASE-1412 Change values for delete column and column family in KeyValue Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=793239&r1=793238&r2=793239&view=diff ============================================================================== --- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original) +++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat Jul 11 22:53:14 2009 @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -63,6 +64,7 @@ import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.fs.FSDataOutputStream; /** * HLog stores all the edits to the HStore. @@ -112,7 +114,10 @@ private final int flushlogentries; private final AtomicInteger unflushedEntries = new AtomicInteger(0); private volatile long lastLogFlushTime; - + private final boolean append; + private final Method syncfs; + private final static Object [] NO_ARGS = new Object []{}; + /* * Current log file. */ @@ -213,6 +218,21 @@ ", flushlogentries=" + this.flushlogentries + ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); rollWriter(); + // Test if syncfs is available. + this.append = conf.getBoolean("dfs.support.append", false); + Method m = null; + if (this.append) { + try { + m = this.writer.getClass().getMethod("syncFs", new Class<?> []{}); + LOG.debug("Using syncFs--hadoop-4379"); + } catch (SecurityException e) { + throw new IOException("Failed test for syncfs", e); + } catch (NoSuchMethodException e) { + // This can happen + LOG.info("syncFs--hadoop-4379 not available" ); + } + } + this.syncfs = m; } /** @@ -585,7 +605,15 @@ public void sync() throws IOException { lastLogFlushTime = System.currentTimeMillis(); - this.writer.sync(); + if (this.append && syncfs != null) { + try { + this.syncfs.invoke(this.writer, NO_ARGS); + } catch (Exception e) { + throw new IOException("Reflection", e); + } + } else { + this.writer.sync(); + } this.unflushedEntries.set(0); } @@ -821,15 +849,16 @@ step * DEFAULT_NUMBER_CONCURRENT_LOG_READS + DEFAULT_NUMBER_CONCURRENT_LOG_READS; for (int i = (step * 10); i < endIndex; i++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + - ": " + logfiles[i].getPath() + - ", length=" + logfiles[i].getLen()); - } // Check for possibly empty file. With appends, currently Hadoop // reports a zero length even if the file has been sync'd. Revisit if // HADOOP-4751 is committed. long length = logfiles[i].getLen(); + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + + ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen()); + } + boolean append = conf.getBoolean("dfs.support.append", false); + recoverLog(fs, logfiles[i].getPath(), append); SequenceFile.Reader in = null; int count = 0; try { @@ -853,10 +882,10 @@ key = new HLogKey(); val = new KeyValue(); } - LOG.debug("Pushed " + count + " entries from " + + LOG.debug("Pushed=" + count + " entries from " + logfiles[i].getPath()); } catch (IOException e) { - LOG.debug("IOE Pushed " + count + " entries from " + + LOG.debug("IOE Pushed=" + count + " entries from " + logfiles[i].getPath()); e = RemoteExceptionHandler.checkIOException(e); if (!(e instanceof EOFException)) { @@ -866,7 +895,7 @@ } } catch (IOException e) { if (length <= 0) { - LOG.warn("Empty hlog, continuing: " + logfiles[i]); + LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e); continue; } throw e; @@ -943,9 +972,6 @@ fs.delete(oldlogfile, true); } } - if (wap == null) { - throw new NullPointerException(); - } wap.w.append(logEntry.getKey(), logEntry.getEdit()); count++; } @@ -1030,6 +1056,40 @@ public static String getHLogDirectoryName(HServerInfo info) { return getHLogDirectoryName(HServerInfo.getServerName(info)); } + + /* + * Recover log. + * If append has been set, try and open log in append mode. + * Doing this, we get a hold of the file that crashed writer + * was writing to. Once we have it, close it. This will + * allow subsequent reader to see up to last sync. + * @param fs + * @param p + * @param append + */ + private static void recoverLog(final FileSystem fs, final Path p, + final boolean append) { + if (!append) { + return; + } + // Trying recovery + boolean recovered = false; + while (!recovered) { + try { + FSDataOutputStream out = fs.append(p); + out.close(); + recovered = true; + } catch (IOException e) { + LOG.info("Failed open for append, waiting on lease recovery: " + p, e); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + // ignore it and try again + } + } + } + LOG.info("Past out lease recovery"); + } /** * Construct the HLog directory name