Author: stack Date: Sat Sep 15 14:11:47 2007 New Revision: 575982 URL: http://svn.apache.org/viewvc?rev=575982&view=rev Log: HADOOP-1903 Possible data loss if Exception happens between snapshot and flush to disk.
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575982&r1=575981&r2=575982&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 14:11:47 2007 @@ -48,6 +48,8 @@ HADOOP-1870 Once file system failure has been detected, don't check it again and get on with shutting down the hbase cluster. HADOOP-1888 NullPointerException in HMemcacheScanner + HADOOP-1903 Possible data loss if Exception happens between snapshot and flush + to disk. IMPROVEMENTS HADOOP-1737 Make HColumnDescriptor data publically members settable Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java?rev=575982&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java Sat Sep 15 14:11:47 2007 @@ -0,0 +1,32 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; +import java.io.IOException; + + +/** + * Thrown during flush if the possibility snapshot content was not properly + * persisted into store files. Response should include replay of hlog content. + */ +public class DroppedSnapshotException extends IOException { + public DroppedSnapshotException(String msg) { + super(msg); + } + + public DroppedSnapshotException() { + super(); + } +} Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=575982&r1=575981&r2=575982&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sat Sep 15 14:11:47 2007 @@ -399,6 +399,7 @@ * the flush will not appear in the correct logfile. * @return sequence ID to pass [EMAIL PROTECTED] #completeCacheFlush(Text, Text, long)} * @see #completeCacheFlush(Text, Text, long) + * @see #abortCacheFlush() */ synchronized long startCacheFlush() { while (this.insideCacheFlush) { @@ -422,7 +423,7 @@ synchronized void completeCacheFlush(final Text regionName, final Text tableName, final long logSeqId) throws IOException { - if(closed) { + if(this.closed) { return; } @@ -430,17 +431,32 @@ throw new IOException("Impossible situation: inside " + "completeCacheFlush(), but 'insideCacheFlush' flag is false"); } - - writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId); + this.writer.append(key, new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), System.currentTimeMillis())); - numEntries.getAndIncrement(); + this.numEntries.getAndIncrement(); // Remember the most-recent flush for each region. // This is used to delete obsolete log files. - regionToLastFlush.put(regionName, logSeqId); + this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId)); - insideCacheFlush = false; + cleanup(); + } + + /** + * Abort a cache flush. + * This method will clear waits on [EMAIL PROTECTED] #insideCacheFlush}. Call if the + * flush fails. Note that the only recovery for an aborted flush currently + * is a restart of the regionserver so the snapshot content dropped by the + * failure gets restored to the memcache. + */ + synchronized void abortCacheFlush() { + cleanup(); + } + + private synchronized void cleanup() { + this.insideCacheFlush = false; notifyAll(); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=575982&r1=575981&r2=575982&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Sep 15 14:11:47 2007 @@ -657,7 +657,7 @@ if (checkFileSystem()) { // If filesystem is OK, is the exception a ConnectionException? // If so, mark the server as down. No point scanning either - // if no server to put meta region on. + // if no server to put meta region on. TODO. if (e instanceof ConnectException) { LOG.debug("Region hosting server is gone."); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=575982&r1=575981&r2=575982&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Sat Sep 15 14:11:47 2007 @@ -101,6 +101,7 @@ } Snapshot retval = new Snapshot(memcache, Long.valueOf(log.startCacheFlush())); + // From here on, any failure is catastrophic requiring replay of hlog this.snapshot = memcache; history.add(memcache); memcache = new TreeMap<HStoreKey, byte []>(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575982&r1=575981&r2=575982&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 14:11:47 2007 @@ -721,6 +721,9 @@ /** * Each HRegion is given a periodic chance to flush the cache, which it should * only take if there have been a lot of uncommitted writes. + * @throws IOException + * @throws DroppedSnapshotException Thrown when replay of hlog is required + * because a Snapshot was not properly persisted. */ void optionallyFlush() throws IOException { if(this.memcache.getSize() > this.memcacheFlushSize) { @@ -754,6 +757,9 @@ * close() the HRegion shortly, so the HRegion should not take on any new and * potentially long-lasting disk operations. This flush() should be the final * pre-close() disk operation. + * @throws IOException + * @throws DroppedSnapshotException Thrown when replay of hlog is required + * because a Snapshot was not properly persisted. */ void flushcache(boolean disableFutureWrites) throws IOException { @@ -815,6 +821,9 @@ * routes. * * <p> This method may block for some time. + * @throws IOException + * @throws DroppedSnapshotException Thrown when replay of hlog is required + * because a Snapshot was not properly persisted. */ void internalFlushcache() throws IOException { long startTime = -1; @@ -833,13 +842,19 @@ // // When execution returns from snapshotMemcacheForLog() with a non-NULL // value, the HMemcache will have a snapshot object stored that must be - // explicitly cleaned up using a call to deleteSnapshot(). + // explicitly cleaned up using a call to deleteSnapshot() or by calling + // abort. // HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log); if(retval == null || retval.memcacheSnapshot == null) { LOG.debug("Finished memcache flush; empty snapshot"); return; } + + // Any failure from here on out will be catastrophic requiring server + // restart so hlog content can be replayed and put back into the memcache. + // Otherwise, the snapshot content while backed up in the hlog, it will not + // be part of the current running servers state. try { long logCacheFlushId = retval.sequenceId; if(LOG.isDebugEnabled()) { @@ -852,7 +867,7 @@ // A. Flush memcache to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. - for(HStore hstore: stores.values()) { + for (HStore hstore: stores.values()) { hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); } @@ -860,17 +875,18 @@ // This tells future readers that the HStores were emitted correctly, // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. - - log.completeCacheFlush(this.regionInfo.regionName, - regionInfo.tableDesc.getName(), logCacheFlushId); + this.log.completeCacheFlush(this.regionInfo.regionName, + regionInfo.tableDesc.getName(), logCacheFlushId); } catch (IOException e) { - LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e); - log.abort(); - throw e; + // An exception here means that the snapshot was not persisted. + // The hlog needs to be replayed so its content is restored to memcache. + // Currently, only a server restart will do this. + this.log.abortCacheFlush(); + throw new DroppedSnapshotException(e.getMessage()); } finally { // C. Delete the now-irrelevant memcache snapshot; its contents have been - // dumped to disk-based HStores. - memcache.deleteSnapshot(); + // dumped to disk-based HStores or, if error, clear aborted snapshot. + this.memcache.deleteSnapshot(); } // D. Finally notify anyone waiting on memcache to clear: @@ -1386,7 +1402,7 @@ } /* - * Add updates to the log and add values to the memcache. + * Add updates first to the hlog and then add values to memcache. * Warning: Assumption is caller has lock on passed in row. * @param row Row to update. * @param timestamp Timestamp to record the updates against Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=575982&r1=575981&r2=575982&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Sep 15 14:11:47 2007 @@ -292,6 +292,16 @@ for(HRegion cur: nonClosedRegionsToFlush) { try { cur.optionallyFlush(); + } catch (DroppedSnapshotException e) { + // Cache flush can fail in a few places. If it fails in a critical + // section, we get a DroppedSnapshotException and a replay of hlog + // is required. Currently the only way to do this is a restart of + // the server. + LOG.fatal("Replay of hlog required. Forcing server restart", e); + if (!checkFileSystem()) { + break; + } + HRegionServer.this.stop(); } catch (IOException iex) { LOG.error("Cache flush failed", RemoteExceptionHandler.checkIOException(iex)); @@ -442,11 +452,11 @@ /** * Sets a flag that will cause all the HRegionServer threads to shut down - * in an orderly fashion. - * <p>FOR DEBUGGING ONLY + * in an orderly fashion. Used by unit tests and called by [EMAIL PROTECTED] Flusher} + * if it judges server needs to be restarted. */ synchronized void stop() { - stopRequested.set(true); + this.stopRequested.set(true); notifyAll(); // Wakes run() if it is sleeping } @@ -457,7 +467,7 @@ * from under hbase or we OOME. */ synchronized void abort() { - abortRequested = true; + this.abortRequested = true; stop(); } @@ -621,7 +631,7 @@ if (this.fsOk) { // Only try to clean up if the file system is available try { - log.close(); + this.log.close(); LOG.info("On abort, closed hlog"); } catch (IOException e) { LOG.error("Unable to close log in abort", @@ -661,7 +671,7 @@ } join(); - LOG.info("main thread exiting"); + LOG.info(Thread.currentThread().getName() + " exiting"); } /* @@ -674,7 +684,7 @@ * run. On its way out, this server will shut down Server. Leases are sort * of inbetween. It has an internal thread that while it inherits from * Chore, it keeps its own internal stop mechanism so needs to be stopped - * by this hosting server. + * by this hosting server. Worker logs the exception and exits. */ private void startAllServices() { String n = Thread.currentThread().getName(); @@ -731,6 +741,7 @@ } } + /** Add to the outbound message buffer */ private void reportOpen(HRegion region) { synchronized(outboundMsgs) { @@ -790,58 +801,58 @@ public void run() { try { - for(ToDoEntry e = null; !stopRequested.get(); ) { - try { - e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - } catch (InterruptedException ex) { - // continue - } - if(e == null || stopRequested.get()) { - continue; - } - try { - LOG.info(e.msg.toString()); - - switch(e.msg.getMsg()) { - - case HMsg.MSG_REGION_OPEN: - // Open a region - openRegion(e.msg.getRegionInfo()); - break; + for(ToDoEntry e = null; !stopRequested.get(); ) { + try { + e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + // continue + } + if(e == null || stopRequested.get()) { + continue; + } + try { + LOG.info(e.msg.toString()); + switch(e.msg.getMsg()) { + + case HMsg.MSG_REGION_OPEN: + // Open a region + openRegion(e.msg.getRegionInfo()); + break; - case HMsg.MSG_REGION_CLOSE: - // Close a region - closeRegion(e.msg.getRegionInfo(), true); - break; + case HMsg.MSG_REGION_CLOSE: + // Close a region + closeRegion(e.msg.getRegionInfo(), true); + break; - case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: - // Close a region, don't reply - closeRegion(e.msg.getRegionInfo(), false); - break; + case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: + // Close a region, don't reply + closeRegion(e.msg.getRegionInfo(), false); + break; - default: - throw new AssertionError( - "Impossible state during msg processing. Instruction: " - + e.msg.toString()); - } - } catch (IOException ie) { - ie = RemoteExceptionHandler.checkIOException(ie); - if(e.tries < numRetries) { - LOG.warn(ie); - e.tries++; - try { - toDo.put(e); - } catch (InterruptedException ex) { - throw new RuntimeException("Putting into msgQueue was interrupted.", ex); + default: + throw new AssertionError( + "Impossible state during msg processing. Instruction: " + + e.msg.toString()); } - } else { - LOG.error("unable to process message: " + e.msg.toString(), ie); - if (!checkFileSystem()) { - break; + } catch (IOException ie) { + ie = RemoteExceptionHandler.checkIOException(ie); + if(e.tries < numRetries) { + LOG.warn(ie); + e.tries++; + try { + toDo.put(e); + } catch (InterruptedException ex) { + throw new RuntimeException("Putting into msgQueue was " + + "interrupted.", ex); + } + } else { + LOG.error("unable to process message: " + e.msg.toString(), ie); + if (!checkFileSystem()) { + break; + } } } } - } } catch(Throwable t) { LOG.fatal("Unhandled exception", t); } finally {