HBASE-14947 WALProcedureStore improvements
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/20b6dba9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/20b6dba9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/20b6dba9 Branch: refs/heads/branch-1.1 Commit: 20b6dba997acd973fb8bd74666c42165bc715dbc Parents: 2c93616 Author: Matteo Bertozzi <[email protected]> Authored: Tue Dec 15 10:15:18 2015 -0800 Committer: Matteo Bertozzi <[email protected]> Committed: Tue Dec 15 12:52:52 2015 -0800 ---------------------------------------------------------------------- .../procedure2/store/ProcedureStoreTracker.java | 21 +- .../procedure2/store/wal/WALProcedureStore.java | 296 ++++++++++--------- .../store/TestProcedureStoreTracker.java | 35 +-- .../store/wal/TestWALProcedureStore.java | 32 +- 4 files changed, 194 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/20b6dba9/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 8516f61..6823288 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -27,7 +27,6 @@ import java.util.TreeMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; /** @@ -356,25 +355,19 @@ public class ProcedureStoreTracker { } } - public void insert(final Procedure proc, final Procedure[] subprocs) { - insert(proc.getProcId()); - if (subprocs != null) { - for (int i = 0; i < subprocs.length; ++i) { - insert(subprocs[i].getProcId()); - } - } - } - - public void update(final Procedure proc) { - update(proc.getProcId()); - } - public void insert(long procId) { BitSetNode node = getOrCreateNode(procId); node.update(procId); trackProcIds(procId); } + public void insert(final long procId, final long[] subProcIds) { + update(procId); + for (int i = 0; i < subProcIds.length; ++i) { + insert(subProcIds[i]); + } + } + public void update(long procId) { Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); assert entry != null : "expected node to update procId=" + procId; http://git-wip-us.apache.org/repos/asf/hbase/blob/20b6dba9/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 0dcdad9..e5d6869 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -104,8 +104,8 @@ public class WALProcedureStore implements ProcedureStore { private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); - private final AtomicLong inactiveLogsMaxId = new AtomicLong(0); private final AtomicBoolean running = new AtomicBoolean(false); + private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); private final Condition slotCond = lock.newCondition(); @@ -196,19 +196,16 @@ public class WALProcedureStore implements ProcedureStore { } LOG.info("Stopping the WAL Procedure Store"); - if (lock.tryLock()) { - try { - waitCond.signalAll(); - syncCond.signalAll(); - } finally { - lock.unlock(); - } - } + sendStopSignal(); if (!abort) { try { - syncThread.join(); + while (syncThread.isAlive()) { + sendStopSignal(); + syncThread.join(250); + } } catch (InterruptedException e) { + LOG.warn("join interrupted", e); Thread.currentThread().interrupt(); } } @@ -225,6 +222,17 @@ public class WALProcedureStore implements ProcedureStore { logs.clear(); } + private void sendStopSignal() { + if (lock.tryLock()) { + try { + waitCond.signalAll(); + syncCond.signalAll(); + } finally { + lock.unlock(); + } + } + } + @Override public boolean isRunning() { return running.get(); @@ -259,32 +267,36 @@ public class WALProcedureStore implements ProcedureStore { @Override public void recoverLease() throws IOException { - LOG.info("Starting WAL Procedure Store lease recovery"); - FileStatus[] oldLogs = getLogFiles(); - while (running.get()) { - // Get Log-MaxID and recover lease on old logs - flushLogId = initOldLogs(oldLogs) + 1; - - // Create new state-log - if (!rollWriter(flushLogId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else has already created log: " + flushLogId); + lock.lock(); + try { + LOG.info("Starting WAL Procedure Store lease recovery"); + FileStatus[] oldLogs = getLogFiles(); + while (isRunning()) { + // Get Log-MaxID and recover lease on old logs + flushLogId = initOldLogs(oldLogs); + + // Create new state-log + if (!rollWriter(flushLogId + 1)) { + // someone else has already created this log + LOG.debug("someone else has already created log " + flushLogId); + continue; } - continue; - } - // We have the lease on the log - oldLogs = getLogFiles(); - if (getMaxLogId(oldLogs) > flushLogId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); + // We have the lease on the log + oldLogs = getLogFiles(); + if (getMaxLogId(oldLogs) > flushLogId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); + } + logs.getLast().removeFile(); + continue; } - logs.getLast().removeFile(); - continue; - } - LOG.info("Lease acquired for flushLogId: " + flushLogId); - break; + LOG.info("Lease acquired for flushLogId: " + flushLogId); + break; + } + } finally { + lock.unlock(); } } @@ -340,18 +352,22 @@ public class WALProcedureStore implements ProcedureStore { } ByteSlot slot = acquireSlot(); - long logId = -1; try { // Serialize the insert + long[] subProcIds = null; if (subprocs != null) { ProcedureWALFormat.writeInsert(slot, proc, subprocs); + subProcIds = new long[subprocs.length]; + for (int i = 0; i < subprocs.length; ++i) { + subProcIds[i] = subprocs[i].getProcId(); + } } else { assert !proc.hasParent(); ProcedureWALFormat.writeInsert(slot, proc); } // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -361,14 +377,6 @@ public class WALProcedureStore implements ProcedureStore { } finally { releaseSlot(slot); } - - // Update the store tracker - synchronized (storeTracker) { - storeTracker.insert(proc, subprocs); - if (logId == flushLogId) { - checkAndTryRoll(); - } - } } @Override @@ -378,13 +386,12 @@ public class WALProcedureStore implements ProcedureStore { } ByteSlot slot = acquireSlot(); - long logId = -1; try { // Serialize the update ProcedureWALFormat.writeUpdate(slot, proc); // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(PushType.UPDATE, slot, proc.getProcId(), null); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -393,20 +400,6 @@ public class WALProcedureStore implements ProcedureStore { } finally { releaseSlot(slot); } - - // Update the store tracker - boolean removeOldLogs = false; - synchronized (storeTracker) { - storeTracker.update(proc); - if (logId == flushLogId) { - removeOldLogs = storeTracker.isUpdated(); - checkAndTryRoll(); - } - } - - if (removeOldLogs) { - setInactiveLogsMaxId(logId - 1); - } } @Override @@ -416,13 +409,12 @@ public class WALProcedureStore implements ProcedureStore { } ByteSlot slot = acquireSlot(); - long logId = -1; try { // Serialize the delete ProcedureWALFormat.writeDelete(slot, procId); // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(PushType.DELETE, slot, procId, null); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -431,22 +423,6 @@ public class WALProcedureStore implements ProcedureStore { } finally { releaseSlot(slot); } - - boolean removeOldLogs = false; - synchronized (storeTracker) { - storeTracker.delete(procId); - if (logId == flushLogId) { - if (storeTracker.isEmpty() || storeTracker.isUpdated()) { - removeOldLogs = checkAndTryRoll(); - } else { - checkAndTryRoll(); - } - } - } - - if (removeOldLogs) { - setInactiveLogsMaxId(logId); - } } private ByteSlot acquireSlot() { @@ -459,7 +435,10 @@ public class WALProcedureStore implements ProcedureStore { slotsCache.offer(slot); } - private long pushData(final ByteSlot slot) { + private enum PushType { INSERT, UPDATE, DELETE }; + + private long pushData(final PushType type, final ByteSlot slot, + final long procId, final long[] subProcIds) { if (!isRunning()) { throw new RuntimeException("the store must be running before inserting data"); } @@ -486,6 +465,7 @@ public class WALProcedureStore implements ProcedureStore { } } + updateStoreTracker(type, procId, subProcIds); slots[slotIndex++] = slot; logId = flushLogId; @@ -514,20 +494,29 @@ public class WALProcedureStore implements ProcedureStore { return logId; } - private boolean isSyncAborted() { - return syncException.get() != null; + private void updateStoreTracker(final PushType type, + final long procId, final long[] subProcIds) { + switch (type) { + case INSERT: + if (subProcIds == null) { + storeTracker.insert(procId); + } else { + storeTracker.insert(procId, subProcIds); + } + break; + case UPDATE: + storeTracker.update(procId); + break; + case DELETE: + storeTracker.delete(procId); + break; + default: + throw new RuntimeException("invalid push type " + type); + } } - protected void periodicRoll() throws IOException { - long logId; - boolean removeOldLogs; - synchronized (storeTracker) { - logId = flushLogId; - removeOldLogs = storeTracker.isEmpty(); - } - if (checkAndTryRoll() && removeOldLogs) { - setInactiveLogsMaxId(logId); - } + private boolean isSyncAborted() { + return syncException.get() != null; } private void syncLoop() throws Throwable { @@ -539,7 +528,7 @@ public class WALProcedureStore implements ProcedureStore { // Wait until new data is available if (slotIndex == 0) { if (!loading.get()) { - removeInactiveLogs(); + periodicRoll(); } if (LOG.isTraceEnabled()) { @@ -552,7 +541,6 @@ public class WALProcedureStore implements ProcedureStore { waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); if (slotIndex == 0) { // no data.. probably a stop() or a periodic roll - periodicRoll(); continue; } } @@ -565,13 +553,12 @@ public class WALProcedureStore implements ProcedureStore { long syncWaitMs = System.currentTimeMillis() - syncWaitSt; if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { float rollSec = getMillisFromLastRoll() / 1000.0f; - LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec", + LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", StringUtils.humanTimeDiff(syncWaitMs), slotIndex, StringUtils.humanSize(totalSynced.get()), StringUtils.humanSize(totalSynced.get() / rollSec))); } - inSync.set(true); totalSynced.addAndGet(syncSlots()); slotIndex = 0; @@ -660,8 +647,7 @@ public class WALProcedureStore implements ProcedureStore { } } - @VisibleForTesting - public boolean rollWriterOrDie() { + private boolean rollWriterOrDie() { for (int i = 1; i <= rollRetries; ++i) { try { if (rollWriter()) { @@ -677,17 +663,13 @@ public class WALProcedureStore implements ProcedureStore { throw new RuntimeException("unable to roll the log"); } - protected boolean checkAndTryRoll() { - if (!isRunning()) return false; - - if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { - try { - return rollWriter(); - } catch (IOException e) { - LOG.warn("Unable to roll the log", e); - } + private boolean tryRollWriter() { + try { + return rollWriter(); + } catch (IOException e) { + LOG.warn("Unable to roll the log", e); + return false; } - return false; } private long getMillisToNextPeriodicRoll() { @@ -701,7 +683,52 @@ public class WALProcedureStore implements ProcedureStore { return (System.currentTimeMillis() - lastRollTs.get()); } - protected boolean rollWriter() throws IOException { + @VisibleForTesting + protected void periodicRollForTesting() throws IOException { + lock.lock(); + try { + periodicRoll(); + } finally { + lock.unlock(); + } + } + + @VisibleForTesting + protected boolean rollWriterForTesting() throws IOException { + lock.lock(); + try { + return rollWriter(); + } finally { + lock.unlock(); + } + } + + private void periodicRoll() throws IOException { + if (storeTracker.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("no active procedures"); + } + tryRollWriter(); + removeAllLogs(flushLogId - 1); + } else { + if (storeTracker.isUpdated()) { + if (LOG.isTraceEnabled()) { + LOG.trace("all the active procedures are in the latest log"); + } + removeAllLogs(flushLogId - 1); + } + + // if the log size has exceeded the roll threshold + // or the periodic roll timeout is expired, try to roll the wal. + if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { + tryRollWriter(); + } + + removeInactiveLogs(); + } + } + + private boolean rollWriter() throws IOException { // Create new state-log if (!rollWriter(flushLogId + 1)) { LOG.warn("someone else has already created log " + flushLogId); @@ -721,6 +748,9 @@ public class WALProcedureStore implements ProcedureStore { } private boolean rollWriter(final long logId) throws IOException { + assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; + assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); + ProcedureWALHeader header = ProcedureWALHeader.newBuilder() .setVersion(ProcedureWALFormat.HEADER_VERSION) .setType(ProcedureWALFormat.LOG_TYPE_STREAM) @@ -749,20 +779,16 @@ public class WALProcedureStore implements ProcedureStore { newStream.close(); return false; } - lock.lock(); - try { - closeStream(); - synchronized (storeTracker) { - storeTracker.resetUpdates(); - } - stream = newStream; - flushLogId = logId; - totalSynced.set(0); - lastRollTs.set(System.currentTimeMillis()); - logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); - } finally { - lock.unlock(); - } + + closeStream(); + + storeTracker.resetUpdates(); + stream = newStream; + flushLogId = logId; + totalSynced.set(0); + lastRollTs.set(System.currentTimeMillis()); + logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); + if (LOG.isDebugEnabled()) { LOG.debug("Roll new state log: " + logId); } @@ -773,11 +799,9 @@ public class WALProcedureStore implements ProcedureStore { try { if (stream != null) { try { - synchronized (storeTracker) { - ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); - ProcedureWALFormat.writeTrailer(stream, storeTracker); - } + ProcedureWALFile log = logs.getLast(); + log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + ProcedureWALFormat.writeTrailer(stream, storeTracker); } catch (IOException e) { LOG.warn("Unable to write the trailer: " + e.getMessage()); } @@ -793,30 +817,12 @@ public class WALProcedureStore implements ProcedureStore { // ========================================================================== // Log Files cleaner helpers // ========================================================================== - private void setInactiveLogsMaxId(long logId) { - long expect = 0; - while (!inactiveLogsMaxId.compareAndSet(expect, logId)) { - expect = inactiveLogsMaxId.get(); - if (expect >= logId) { - break; - } - } - } - private void removeInactiveLogs() { - long lastLogId = inactiveLogsMaxId.get(); - if (lastLogId != 0) { - removeAllLogs(lastLogId); - inactiveLogsMaxId.compareAndSet(lastLogId, 0); - } - // Verify if the ProcId of the first oldest is still active. if not remove the file. while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); - synchronized (storeTracker) { - if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { - break; - } + if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { + break; } removeLogFile(log); } http://git-wip-us.apache.org/repos/asf/hbase/blob/20b6dba9/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 0dc9d92..26a94d4 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Assert; @@ -41,27 +40,6 @@ import static org.junit.Assert.fail; public class TestProcedureStoreTracker { private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class); - static class TestProcedure extends Procedure<Void> { - public TestProcedure(long procId) { - setProcId(procId); - } - - @Override - protected Procedure[] execute(Void env) { return null; } - - @Override - protected void rollback(Void env) { /* no-op */ } - - @Override - protected boolean abort(Void env) { return false; } - - @Override - protected void serializeStateData(final OutputStream stream) { /* no-op */ } - - @Override - protected void deserializeStateData(final InputStream stream) { /* no-op */ } - } - @Test public void testSeqInsertAndDelete() { ProcedureStoreTracker tracker = new ProcedureStoreTracker(); @@ -161,13 +139,10 @@ public class TestProcedureStoreTracker { ProcedureStoreTracker tracker = new ProcedureStoreTracker(); assertTrue(tracker.isEmpty()); - Procedure[] procs = new TestProcedure[] { - new TestProcedure(1), new TestProcedure(2), new TestProcedure(3), - new TestProcedure(4), new TestProcedure(5), new TestProcedure(6), - }; + long[] procs = new long[] { 1, 2, 3, 4, 5, 6 }; - tracker.insert(procs[0], null); - tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] }); + tracker.insert(procs[0]); + tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] }); assertFalse(tracker.isEmpty()); assertTrue(tracker.isUpdated()); @@ -189,11 +164,11 @@ public class TestProcedureStoreTracker { assertTrue(tracker.isUpdated()); for (int i = 0; i < 5; ++i) { - tracker.delete(procs[i].getProcId()); + tracker.delete(procs[i]); assertFalse(tracker.isEmpty()); assertTrue(tracker.isUpdated()); } - tracker.delete(procs[5].getProcId()); + tracker.delete(procs[5]); assertTrue(tracker.isEmpty()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/20b6dba9/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 07c83d3..4fea6de 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -97,7 +97,7 @@ public class TestWALProcedureStore { @Test public void testEmptyRoll() throws Exception { for (int i = 0; i < 10; ++i) { - procStore.periodicRoll(); + procStore.periodicRollForTesting(); } FileStatus[] status = fs.listStatus(logDir); assertEquals(1, status.length); @@ -246,6 +246,36 @@ public class TestWALProcedureStore { assertEquals(1, procStore.getActiveLogs().size()); } + @Test + public void testRollAndRemove() throws IOException { + // Insert something in the log + Procedure proc1 = new TestSequentialProcedure(); + procStore.insert(proc1, null); + + Procedure proc2 = new TestSequentialProcedure(); + procStore.insert(proc2, null); + + // roll the log, now we have 2 + procStore.rollWriterForTesting(); + assertEquals(2, procStore.getActiveLogs().size()); + + // everything will be up to date in the second log + // so we can remove the first one + procStore.update(proc1); + procStore.update(proc2); + assertEquals(1, procStore.getActiveLogs().size()); + + // roll the log, now we have 2 + procStore.rollWriterForTesting(); + assertEquals(2, procStore.getActiveLogs().size()); + + // remove everything active + // so we can remove all the logs + procStore.delete(proc1.getProcId()); + procStore.delete(proc2.getProcId()); + assertEquals(1, procStore.getActiveLogs().size()); + } + private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { assertTrue(logFile.getLen() > dropBytes);
