Repository: hbase Updated Branches: refs/heads/branch-1.1 aa0e492f8 -> 20d5577d2
HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/20d5577d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/20d5577d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/20d5577d Branch: refs/heads/branch-1.1 Commit: 20d5577d2e1b8e395e66be88a66a70bec8665f4f Parents: aa0e492 Author: Matteo Bertozzi <[email protected]> Authored: Fri Jan 22 17:12:18 2016 -0800 Committer: Matteo Bertozzi <[email protected]> Committed: Fri Jan 22 17:12:18 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ProcedureInfo.java | 20 +++++---- .../hadoop/hbase/procedure2/Procedure.java | 17 ++------ .../hbase/procedure2/ProcedureExecutor.java | 8 ++-- .../hbase/procedure2/store/ProcedureStore.java | 1 + .../procedure2/store/ProcedureStoreTracker.java | 10 +++-- .../store/wal/ProcedureWALFormat.java | 1 - .../store/wal/ProcedureWALFormatReader.java | 9 ++-- .../procedure2/store/wal/WALProcedureStore.java | 32 +++++++------- .../store/TestProcedureStoreTracker.java | 4 +- .../store/wal/TestWALProcedureStore.java | 44 ++++++++++++++++++++ 10 files changed, 91 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java index 4a15857..c79ea98 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java @@ -41,12 +41,12 @@ public class ProcedureInfo { private final String procOwner; private final ProcedureState procState; private final long parentId; + private final NonceKey nonceKey; private final ForeignExceptionMessage exception; private final long lastUpdate; private final long startTime; private final byte[] result; - private NonceKey nonceKey = null; private long clientAckTime = -1; public ProcedureInfo( @@ -55,6 +55,7 @@ public class ProcedureInfo { final String procOwner, final ProcedureState procState, final long parentId, + final NonceKey nonceKey, final ForeignExceptionMessage exception, final long lastUpdate, final long startTime, @@ -64,6 +65,7 @@ public class ProcedureInfo { this.procOwner = procOwner; this.procState = procState; this.parentId = parentId; + this.nonceKey = nonceKey; this.lastUpdate = lastUpdate; this.startTime = startTime; @@ -73,8 +75,8 @@ public class ProcedureInfo { } public ProcedureInfo clone() { - return new ProcedureInfo( - procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime, result); + return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey, + exception, lastUpdate, startTime, result); } public long getProcId() { @@ -105,10 +107,6 @@ public class ProcedureInfo { return nonceKey; } - public void setNonceKey(NonceKey nonceKey) { - this.nonceKey = nonceKey; - } - public boolean isFailed() { return exception != null; } @@ -216,13 +214,19 @@ public class ProcedureInfo { */ @InterfaceAudience.Private public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) { + NonceKey nonceKey = null; + if (procProto.getNonce() != HConstants.NO_NONCE) { + nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce()); + } + return new ProcedureInfo( procProto.getProcId(), procProto.getClassName(), procProto.getOwner(), procProto.getState(), procProto.hasParentId() ? procProto.getParentId() : -1, - procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null, + nonceKey, + procProto.hasException() ? procProto.getException() : null, procProto.getLastUpdate(), procProto.getStartTime(), procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null); http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index b9306c4..304c225 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -600,30 +600,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { */ @InterfaceAudience.Private public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) { - RemoteProcedureException exception; - - if (proc.hasException()) { - exception = proc.getException(); - } else { - exception = null; - } - ProcedureInfo procInfo = new ProcedureInfo( + RemoteProcedureException exception = proc.hasException() ? proc.getException() : null; + return new ProcedureInfo( proc.getProcId(), proc.toStringClass(), proc.getOwner(), proc.getState(), proc.hasParent() ? proc.getParentProcId() : -1, + nonceKey, exception != null ? RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null, proc.getLastUpdate(), proc.getStartTime(), proc.getResult()); - - if (nonceKey != null) { - procInfo.setNonceKey(nonceKey); - } - - return procInfo; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index dd439fd..5277fa2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -813,13 +813,15 @@ public class ProcedureExecutor<TEnvironment> { break; } - if (proc.getProcId() == rootProcId && proc.isSuccess()) { - // Finalize the procedure state + if (proc.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Procedure completed in " + StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc); } - procedureFinished(proc); + // Finalize the procedure state + if (proc.getProcId() == rootProcId) { + procedureFinished(proc); + } break; } } while (procStack.isFailed()); http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 83933ff..e49475d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -23,6 +23,7 @@ import java.util.Iterator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.Procedure; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 6823288..fe2904b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -414,7 +414,9 @@ public class ProcedureStoreTracker { node.updateState(procId, isDeleted); } - public void clear() { + public void reset() { + this.keepDeletes = false; + this.partial = false; this.map.clear(); resetUpdates(); } @@ -579,11 +581,11 @@ public class ProcedureStoreTracker { } public void readFrom(final InputStream stream) throws IOException { - ProcedureProtos.ProcedureStoreTracker data = + reset(); + final ProcedureProtos.ProcedureStoreTracker data = ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream); - map.clear(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) { - BitSetNode node = BitSetNode.convert(protoNode); + final BitSetNode node = BitSetNode.convert(protoNode); map.put(node.getStart(), node); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index 17432ac..0df4046 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -64,7 +64,6 @@ public final class ProcedureWALFormat { } interface Loader { - void removeLog(ProcedureWALFile log); void markCorruptedWAL(ProcedureWALFile log, IOException e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 4dbd929..7d9a57b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -25,9 +25,10 @@ import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; @@ -91,10 +92,7 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localProcedures.isEmpty()) { - LOG.info("No active entry found in state log " + log + ". removing it"); - loader.removeLog(log); - } else { + if (!localProcedures.isEmpty()) { Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd = localProcedures.entrySet().iterator(); long minProcId = Long.MAX_VALUE; @@ -160,6 +158,7 @@ public class ProcedureWALFormatReader { } maxProcId = Math.max(maxProcId, entry.getProcId()); localProcedures.remove(entry.getProcId()); + assert !procedures.containsKey(entry.getProcId()); tracker.setDeleted(entry.getProcId(), true); } http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index e5d6869..0089760 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -247,8 +247,13 @@ public class WALProcedureStore implements ProcedureStore { return storeTracker; } - public LinkedList<ProcedureWALFile> getActiveLogs() { - return logs; + public ArrayList<ProcedureWALFile> getActiveLogs() { + lock.lock(); + try { + return new ArrayList<ProcedureWALFile>(logs); + } finally { + lock.unlock(); + } } public Set<ProcedureWALFile> getCorruptedLogs() { @@ -316,17 +321,11 @@ public class WALProcedureStore implements ProcedureStore { } // Load the old logs - final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>(); Iterator<ProcedureWALFile> it = logs.descendingIterator(); it.next(); // Skip the current log try { return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { @Override - public void removeLog(ProcedureWALFile log) { - toRemove.add(log); - } - - @Override public void markCorruptedWAL(ProcedureWALFile log, IOException e) { if (corruptedLogs == null) { corruptedLogs = new HashSet<ProcedureWALFile>(); @@ -336,11 +335,6 @@ public class WALProcedureStore implements ProcedureStore { } }); } finally { - if (!toRemove.isEmpty()) { - for (ProcedureWALFile log: toRemove) { - removeLogFile(log); - } - } loading.set(false); } } @@ -589,6 +583,7 @@ public class WALProcedureStore implements ProcedureStore { totalSynced = syncSlots(stream, slots, 0, slotIndex); break; } catch (Throwable e) { + LOG.warn("unable to sync slots, retry=" + retry); if (++retry >= maxRetriesBeforeRoll) { if (logRolled >= maxSyncFailureRoll) { LOG.error("Sync slots after log roll failed, abort.", e); @@ -648,14 +643,15 @@ public class WALProcedureStore implements ProcedureStore { } private boolean rollWriterOrDie() { - for (int i = 1; i <= rollRetries; ++i) { + for (int i = 0; i < rollRetries; ++i) { + if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); + try { if (rollWriter()) { return true; } } catch (IOException e) { - LOG.warn("Unable to roll the log, attempt=" + i, e); - Threads.sleepWithoutInterrupt(waitBeforeRoll); + LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); } } LOG.fatal("Unable to roll the log"); @@ -902,7 +898,7 @@ public class WALProcedureStore implements ProcedureStore { } } - private long getMaxLogId(final FileStatus[] logFiles) { + private static long getMaxLogId(final FileStatus[] logFiles) { long maxLogId = 0; if (logFiles != null && logFiles.length > 0) { for (int i = 0; i < logFiles.length; ++i) { @@ -945,7 +941,7 @@ public class WALProcedureStore implements ProcedureStore { } catch (IOException e) { LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); // try the next one... - storeTracker.clear(); + storeTracker.reset(); storeTracker.setPartialFlag(true); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 26a94d4..6bc5d36 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -192,7 +192,7 @@ public class TestProcedureStoreTracker { count++; } - tracker.clear(); + tracker.reset(); } } @@ -212,7 +212,7 @@ public class TestProcedureStoreTracker { tracker.setDeleted(i, false); } - tracker.clear(); + tracker.reset(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 4fea6de..a33f334 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -151,6 +151,50 @@ public class TestWALProcedureStore { } @Test + public void testNoTrailerDoubleRestart() throws Exception { + // log-0001: proc 0, 1 and 2 are inserted + Procedure proc0 = new TestSequentialProcedure(); + procStore.insert(proc0, null); + Procedure proc1 = new TestSequentialProcedure(); + procStore.insert(proc1, null); + Procedure proc2 = new TestSequentialProcedure(); + procStore.insert(proc2, null); + procStore.rollWriterForTesting(); + + // log-0002: proc 1 deleted + procStore.delete(proc1.getProcId()); + procStore.rollWriterForTesting(); + + // log-0003: proc 2 is update + procStore.update(proc2); + procStore.rollWriterForTesting(); + + // log-0004: proc 2 deleted + procStore.delete(proc2.getProcId()); + + // stop the store and remove the trailer + procStore.stop(false); + FileStatus[] logs = fs.listStatus(logDir); + assertEquals(4, logs.length); + for (int i = 0; i < logs.length; ++i) { + corruptLog(logs[i], 4); + } + + // Test Load 1 + assertEquals(1, countProcedures(storeRestart())); + + // Test Load 2 + assertEquals(5, fs.listStatus(logDir).length); + assertEquals(1, countProcedures(storeRestart())); + + // remove proc-0 + procStore.delete(proc0.getProcId()); + procStore.periodicRollForTesting(); + assertEquals(1, fs.listStatus(logDir).length); + assertEquals(0, countProcedures(storeRestart())); + } + + @Test public void testCorruptedTrailer() throws Exception { // Insert something for (int i = 0; i < 100; ++i) {
