Author: stack
Date: Sat Oct 16 05:27:57 2010
New Revision: 1023183
URL: http://svn.apache.org/viewvc?rev=1023183&view=rev
Log:
HBASE-2933 Skip EOF Errors during Log Recovery
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1023183&r1=1023182&r2=1023183&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sat Oct 16 05:27:57 2010
@@ -586,6 +586,8 @@ Release 0.21.0 - Unreleased
HBASE-3044 [replication] ReplicationSource won't cleanup logs if there's
nothing to replicate
HBASE-3113 Don't reassign regions if cluster is being shutdown
+ HBASE-2933 Skip EOF Errors during Log Recovery
+ (Nicolas Spiegelberg via Stack)
IMPROVEMENTS
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1023183&r1=1023182&r2=1023183&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sat Oct 16 05:27:57 2010
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@@ -1807,83 +1808,85 @@ public class HRegion implements HeapSize
LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
- return replayRecoveredEdits(reader, minSeqId, reporter);
- } finally {
- reader.close();
- }
- }
-
- /* @param reader Reader against file of recovered edits.
- * @param minSeqId Any edit found in split editlogs needs to be in excess of
- * this minSeqId to be applied, else its skipped.
- * @param reporter
- * @return the sequence id of the last edit added to this region out of the
- * recovered edits log or <code>minSeqId</code> if nothing added from
editlogs.
- * @throws IOException
- */
- private long replayRecoveredEdits(final HLog.Reader reader,
- final long minSeqId, final Progressable reporter)
- throws IOException {
long currentEditSeqId = minSeqId;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
HLog.Entry entry;
Store store = null;
- // How many edits to apply before we send a progress report.
- int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2000);
- while ((entry = reader.next()) != null) {
- HLogKey key = entry.getKey();
- WALEdit val = entry.getEdit();
- if (firstSeqIdInLog == -1) {
- firstSeqIdInLog = key.getLogSeqNum();
- }
- // Now, figure if we should skip this edit.
- if (key.getLogSeqNum() <= currentEditSeqId) {
- skippedEdits++;
- continue;
- }
- currentEditSeqId = key.getLogSeqNum();
- boolean flush = false;
- for (KeyValue kv: val.getKeyValues()) {
- // Check this edit is for me. Also, guard against writing the special
- // METACOLUMN info such as HBASE::CACHEFLUSH entries
- if (kv.matchingFamily(HLog.METAFAMILY) ||
- !Bytes.equals(key.getEncodedRegionName(),
this.regionInfo.getEncodedNameAsBytes())) {
- skippedEdits++;
- continue;
+
+ try {
+ // How many edits to apply before we send a progress report.
+ int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2000);
+ while ((entry = reader.next()) != null) {
+ HLogKey key = entry.getKey();
+ WALEdit val = entry.getEdit();
+ if (firstSeqIdInLog == -1) {
+ firstSeqIdInLog = key.getLogSeqNum();
}
- // Figure which store the edit is meant for.
- if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
- store = this.stores.get(kv.getFamily());
- }
- if (store == null) {
- // This should never happen. Perhaps schema was changed between
- // crash and redeploy?
- LOG.warn("No family for " + kv);
+ // Now, figure if we should skip this edit.
+ if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
- // Once we are over the limit, restoreEdit will keep returning true to
- // flush -- but don't flush until we've played all the kvs that make up
- // the WALEdit.
- flush = restoreEdit(store, kv);
- editsCount++;
- }
- if (flush) internalFlushcache(null, currentEditSeqId);
-
- // Every 'interval' edits, tell the reporter we're making progress.
- // Have seen 60k edits taking 3minutes to complete.
- if (reporter != null && (editsCount % interval) == 0) {
- reporter.progress();
+ currentEditSeqId = key.getLogSeqNum();
+ boolean flush = false;
+ for (KeyValue kv: val.getKeyValues()) {
+ // Check this edit is for me. Also, guard against writing the special
+ // METACOLUMN info such as HBASE::CACHEFLUSH entries
+ if (kv.matchingFamily(HLog.METAFAMILY) ||
+ !Bytes.equals(key.getEncodedRegionName(),
this.regionInfo.getEncodedNameAsBytes())) {
+ skippedEdits++;
+ continue;
+ }
+ // Figure which store the edit is meant for.
+ if (store == null ||
!kv.matchingFamily(store.getFamily().getName())) {
+ store = this.stores.get(kv.getFamily());
+ }
+ if (store == null) {
+ // This should never happen. Perhaps schema was changed between
+ // crash and redeploy?
+ LOG.warn("No family for " + kv);
+ skippedEdits++;
+ continue;
+ }
+ // Once we are over the limit, restoreEdit will keep returning true
to
+ // flush -- but don't flush until we've played all the kvs that make
up
+ // the WALEdit.
+ flush = restoreEdit(store, kv);
+ editsCount++;
+ }
+ if (flush) internalFlushcache(null, currentEditSeqId);
+
+ // Every 'interval' edits, tell the reporter we're making progress.
+ // Have seen 60k edits taking 3minutes to complete.
+ if (reporter != null && (editsCount % interval) == 0) {
+ reporter.progress();
+ }
+ }
+ } catch (EOFException eof) {
+ Path p = HLog.moveAsideBadEditsFile(fs, edits);
+ LOG.warn("Encountered EOF. Most likely due to Master failure during " +
+ "log spliting, so we have this data in another edit. " +
+ "Continuing, but renaming " + edits + " as " + p, eof);
+ } catch (IOException ioe) {
+ if (ioe.getMessage().startsWith("File is corrupt")) {
+ Path p = HLog.moveAsideBadEditsFile(fs, edits);
+ LOG.warn("File corruption encountered! " +
+ "Continuing, but renaming " + edits + " as " + p, ioe);
+ } else {
+ throw ioe;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
- ", firstSequenceidInLog=" + firstSeqIdInLog +
- ", maxSequenceidInLog=" + currentEditSeqId);
+ ", firstSequenceidInLog=" + firstSeqIdInLog +
+ ", maxSequenceidInLog=" + currentEditSeqId);
}
return currentEditSeqId;
+ } finally {
+ reader.close();
+ }
}
/**