This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 81cbba3 ISSUE #659: Fix Checkpoint logic in SortedLedgerStorage
81cbba3 is described below
commit 81cbba3cf620f2a6df48c0da6ee1ac019de24fbc
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Dec 1 22:05:53 2017 -0700
ISSUE #659: Fix Checkpoint logic in SortedLedgerStorage
Descriptions of the changes in this PR:
- add a test case to reproduce the behavior
Author: Sijie Guo <[email protected]>
Reviewers: Ivan Kelly <[email protected]>, Charan Reddy Guttapalem
<[email protected]>
This closes #677 from sijie/fix_checkpoint, closes #659
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 18 +-
.../apache/bookkeeper/bookie/CacheCallback.java | 3 +-
.../{CacheCallback.java => Checkpointer.java} | 20 +-
.../apache/bookkeeper/bookie/EntryMemTable.java | 2 +-
.../bookie/InterleavedLedgerStorage.java | 57 ++---
.../apache/bookkeeper/bookie/LedgerStorage.java | 11 +-
.../bookkeeper/bookie/SortedLedgerStorage.java | 55 +++--
.../org/apache/bookkeeper/bookie/SyncThread.java | 89 ++++----
.../apache/bookkeeper/bookie/CompactionTest.java | 68 ++++--
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 19 +-
.../bookkeeper/bookie/LedgerStorageTestBase.java | 88 ++++++++
.../bookie/SortedLedgerStorageCheckpointTest.java | 231 +++++++++++++++++++++
.../bookkeeper/bookie/TestEntryMemTable.java | 11 +-
.../apache/bookkeeper/bookie/TestSyncThread.java | 59 +++---
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 17 +-
.../bookkeeper/meta/LedgerManagerTestCase.java | 15 +-
16 files changed, 579 insertions(+), 184 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 0983395..c08ff46 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -705,9 +705,18 @@ public class Bookie extends BookieCriticalThread {
String ledgerStorageClass = conf.getLedgerStorageClass();
LOG.info("Using ledger storage: {}", ledgerStorageClass);
ledgerStorage =
LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
- ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager,
indexDirsManager, checkpointSource,
- statsLogger);
syncThread = new SyncThread(conf, getLedgerDirsListener(),
ledgerStorage, checkpointSource);
+
+ ledgerStorage.initialize(
+ conf,
+ ledgerManager,
+ ledgerDirsManager,
+ indexDirsManager,
+ checkpointSource,
+ syncThread,
+ statsLogger);
+
+
handles = new HandleFactoryImpl(ledgerStorage);
// Expose Stats
@@ -818,11 +827,6 @@ public class Bookie extends BookieCriticalThread {
idxMonitor.start();
}
- // start sync thread first, so during replaying journals, we could do
checkpoint
- // which reduce the chance that we need to replay journals again if
bookie restarted
- // again before finished journal replays.
- syncThread.start();
-
// replay journals
try {
readJournal();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
index df9a848..fb41cc5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.bookie;
import java.io.IOException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
/**
* Interface plugged into caching to receive callback notifications.
@@ -30,5 +31,5 @@ public interface CacheCallback {
/**
* Process notification that cache size limit reached.
*/
- void onSizeLimitReached() throws IOException;
+ void onSizeLimitReached(Checkpoint cp) throws IOException;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
similarity index 67%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
copy to
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
index df9a848..967d3e9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,19 +15,26 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
package org.apache.bookkeeper.bookie;
-import java.io.IOException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
/**
- * Interface plugged into caching to receive callback notifications.
+ * The instance that is responsible for checkpointing ledger storage.
*/
-public interface CacheCallback {
+public interface Checkpointer {
+
+ Checkpointer NULL = checkpoint -> {
+ // do nothing;
+ };
+
/**
- * Process notification that cache size limit reached.
+ * Start checkpointing for a given <i>checkpoint</i> location.
+ *
+ * @param checkpoint the checkpoint location to checkpoint.
*/
- void onSizeLimitReached() throws IOException;
+ void startCheckpoint(Checkpoint checkpoint);
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index c26c8e3..c8f9483 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -309,7 +309,7 @@ public class EntryMemTable {
if (isSizeLimitReached() || (!previousFlushSucceeded.get())) {
Checkpoint cp = snapshot();
if ((null != cp) || (!previousFlushSucceeded.get())) {
- cb.onSizeLimitReached();
+ cb.onSizeLimitReached(cp);
} else {
throttleWriters();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 19a6105..6db3e6d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -21,13 +21,12 @@
package org.apache.bookkeeper.bookie;
-
+import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -60,33 +59,10 @@ import org.slf4j.LoggerFactory;
public class InterleavedLedgerStorage implements CompactableLedgerStorage,
EntryLogListener {
private static final Logger LOG =
LoggerFactory.getLogger(InterleavedLedgerStorage.class);
- /**
- * Hold the last checkpoint.
- */
- protected static class CheckpointHolder {
- Checkpoint lastCheckpoint = Checkpoint.MAX;
-
- protected synchronized void setNextCheckpoint(Checkpoint cp) {
- if (Checkpoint.MAX.equals(lastCheckpoint) ||
lastCheckpoint.compareTo(cp) < 0) {
- lastCheckpoint = cp;
- }
- }
-
- protected synchronized void clearLastCheckpoint(Checkpoint done) {
- if (0 == lastCheckpoint.compareTo(done)) {
- lastCheckpoint = Checkpoint.MAX;
- }
- }
-
- protected synchronized Checkpoint getLastCheckpoint() {
- return lastCheckpoint;
- }
- }
-
EntryLogger entryLogger;
LedgerCache ledgerCache;
- private CheckpointSource checkpointSource;
- protected final CheckpointHolder checkpointHolder = new CheckpointHolder();
+ protected CheckpointSource checkpointSource;
+ protected Checkpointer checkpointer;
private final CopyOnWriteArrayList<LedgerDeletionListener>
ledgerDeletionListeners =
Lists.newCopyOnWriteArrayList();
@@ -110,12 +86,19 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
}
@Override
- public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager,
- LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
- CheckpointSource checkpointSource, StatsLogger
statsLogger)
+ public void initialize(ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger)
throws IOException {
+ checkNotNull(checkpointSource, "invalid null checkpoint source");
+ checkNotNull(checkpointer, "invalid null checkpointer");
this.checkpointSource = checkpointSource;
+ this.checkpointer = checkpointer;
entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager :
indexDirsManager, statsLogger);
@@ -361,21 +344,12 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
}
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
- Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
- // if checkpoint is less than last checkpoint, we don't need to do
checkpoint again.
- if (lastCheckpoint.compareTo(checkpoint) > 0) {
- return lastCheckpoint;
- }
+ public void checkpoint(Checkpoint checkpoint) throws IOException {
// we don't need to check somethingwritten since checkpoint
// is scheduled when rotate an entry logger file. and we could
// not set somethingWritten to false after checkpoint, since
// current entry logger file isn't flushed yet.
flushOrCheckpoint(true);
- // after the ledger storage finished checkpointing, try to clear the
done checkpoint
-
- checkpointHolder.clearLastCheckpoint(lastCheckpoint);
- return lastCheckpoint;
}
@Override
@@ -465,6 +439,7 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
// TODO: we could consider remove checkpointSource and
checkpointSouce#newCheckpoint
// later if we provide kind of LSN (Log/Journal Squeuence Number)
// mechanism when adding entry. {@link
https://github.com/apache/bookkeeper/issues/279}
- checkpointHolder.setNextCheckpoint(checkpointSource.newCheckpoint());
+ Checkpoint checkpoint = checkpointSource.newCheckpoint();
+ checkpointer.startCheckpoint(checkpoint);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 0207ba0..d2055a8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -42,8 +42,13 @@ public interface LedgerStorage {
* @param ledgerManager
* @param ledgerDirsManager
*/
- void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
- LedgerDirsManager indexDirsManager, CheckpointSource
checkpointSource, StatsLogger statsLogger)
+ void initialize(ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger)
throws IOException;
/**
@@ -138,7 +143,7 @@ public interface LedgerStorage {
* @throws IOException
* @return the checkpoint that the ledger storage finished.
*/
- Checkpoint checkpoint(Checkpoint checkpoint) throws IOException;
+ void checkpoint(Checkpoint checkpoint) throws IOException;
/**
* @param ledgerId
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index ea1ff5a..972f085 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.bookie;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
@@ -53,11 +54,22 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
}
@Override
- public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager,
- LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
- final CheckpointSource checkpointSource,
StatsLogger statsLogger)
+ public void initialize(ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger)
throws IOException {
- super.initialize(conf, ledgerManager, ledgerDirsManager,
indexDirsManager, checkpointSource, statsLogger);
+ super.initialize(
+ conf,
+ ledgerManager,
+ ledgerDirsManager,
+ indexDirsManager,
+ checkpointSource,
+ checkpointer,
+ statsLogger);
this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
@@ -65,6 +77,11 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
.setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) /
2).build());
}
+ @VisibleForTesting
+ ScheduledExecutorService getScheduler() {
+ return scheduler;
+ }
+
@Override
public void start() {
try {
@@ -146,14 +163,16 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
}
@Override
- public Checkpoint checkpoint(final Checkpoint checkpoint) throws
IOException {
- Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
- // if checkpoint is less than last checkpoint, we don't need to do
checkpoint again.
- if (lastCheckpoint.compareTo(checkpoint) > 0) {
- return lastCheckpoint;
+ public void checkpoint(final Checkpoint checkpoint) throws IOException {
+ long numBytesFlushed = memTable.flush(this, checkpoint);
+ if (numBytesFlushed > 0) {
+ // if bytes are added between previous flush and this checkpoint,
+ // it means bytes might live at current active entry log, we need
+ // roll current entry log and then issue checkpoint to underlying
+ // interleaved ledger storage.
+ entryLogger.rollLog();
}
- memTable.flush(this, checkpoint);
- return super.checkpoint(checkpoint);
+ super.checkpoint(checkpoint);
}
@Override
@@ -170,7 +189,8 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
// CacheCallback functions.
@Override
- public void onSizeLimitReached() throws IOException {
+ public void onSizeLimitReached(final Checkpoint cp) throws IOException {
+ LOG.info("Reached size {}", cp);
// when size limit reached, we get the previous checkpoint from
snapshot mem-table.
// at this point, we are safer to schedule a checkpoint, since the
entries added before
// this checkpoint already written to entry logger.
@@ -194,8 +214,9 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
// for performance consideration: since we don't wanna
checkpoint a new log file that ledger
// storage is writing to.
if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush
!= logIdBeforeFlush) {
- entryLogger.rollLog();
LOG.info("Rolling entry logger since it reached size
limitation");
+ entryLogger.rollLog();
+ checkpointer.startCheckpoint(cp);
}
} catch (IOException e) {
// TODO: if we failed to flush data, we should switch the
bookie back to readonly mode
@@ -205,4 +226,12 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
}
});
}
+
+ @Override
+ public void onRotateEntryLog() {
+ // override the behavior at interleaved ledger storage.
+ // we don't trigger any checkpoint logic when an entry log file is
rotated, because entry log file rotation
+ // can happen because compaction. in a sorted ledger storage,
checkpoint should happen after the data is
+ // flushed to the entry log file.
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index ca001a4..44bf6ae 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -24,18 +24,16 @@ package org.apache.bookkeeper.bookie;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
-import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* SyncThread is a background thread which help checkpointing ledger storage
@@ -54,8 +52,8 @@ import org.slf4j.LoggerFactory;
* for manual recovery in critical disaster.
* </p>
*/
-class SyncThread {
- private static final Logger LOG =
LoggerFactory.getLogger(SyncThread.class);
+@Slf4j
+class SyncThread implements Checkpointer {
final ScheduledExecutorService executor;
final int flushInterval;
@@ -78,47 +76,43 @@ class SyncThread {
.setNameFormat("SyncThread-" + conf.getBookiePort() + "-%d");
this.executor =
Executors.newSingleThreadScheduledExecutor(tfb.build());
flushInterval = conf.getFlushInterval();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Flush Interval : {}", flushInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("Flush Interval : {}", flushInterval);
}
}
- void start() {
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- try {
- synchronized (suspensionLock) {
- while (suspended) {
- try {
- suspensionLock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ executor.submit(() -> {
+ try {
+ synchronized (suspensionLock) {
+ while (suspended) {
+ try {
+ suspensionLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
}
}
- if (!disableCheckpoint) {
- checkpoint(checkpointSource.newCheckpoint());
- }
- } catch (Throwable t) {
- LOG.error("Exception in SyncThread", t);
- dirsListener.fatalError();
}
+ if (!disableCheckpoint) {
+ checkpoint(checkpoint);
+ }
+ } catch (Throwable t) {
+ log.error("Exception in SyncThread", t);
+ dirsListener.fatalError();
}
- }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+ });
}
public Future<Void> requestFlush() {
- return executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- try {
- flush();
- } catch (Throwable t) {
- LOG.error("Exception flushing ledgers ", t);
- }
- return null;
+ return executor.submit(() -> {
+ try {
+ flush();
+ } catch (Throwable t) {
+ log.error("Exception flushing ledgers ", t);
}
+ return null;
});
}
@@ -127,11 +121,11 @@ class SyncThread {
try {
ledgerStorage.flush();
} catch (NoWritableLedgerDirException e) {
- LOG.error("No writeable ledger directories", e);
+ log.error("No writeable ledger directories", e);
dirsListener.allDisksFull();
return;
} catch (IOException e) {
- LOG.error("Exception flushing ledgers", e);
+ log.error("Exception flushing ledgers", e);
return;
}
@@ -139,32 +133,37 @@ class SyncThread {
return;
}
- LOG.info("Flush ledger storage at checkpoint {}.", checkpoint);
+ log.info("Flush ledger storage at checkpoint {}.", checkpoint);
try {
checkpointSource.checkpointComplete(checkpoint, false);
} catch (IOException e) {
- LOG.error("Exception marking checkpoint as complete", e);
+ log.error("Exception marking checkpoint as complete", e);
dirsListener.allDisksFull();
}
}
@VisibleForTesting
public void checkpoint(Checkpoint checkpoint) {
+ if (null == checkpoint) {
+ // do nothing if checkpoint is null
+ return;
+ }
+
try {
- checkpoint = ledgerStorage.checkpoint(checkpoint);
+ ledgerStorage.checkpoint(checkpoint);
} catch (NoWritableLedgerDirException e) {
- LOG.error("No writeable ledger directories", e);
+ log.error("No writeable ledger directories", e);
dirsListener.allDisksFull();
return;
} catch (IOException e) {
- LOG.error("Exception flushing ledgers", e);
+ log.error("Exception flushing ledgers", e);
return;
}
try {
checkpointSource.checkpointComplete(checkpoint, true);
} catch (IOException e) {
- LOG.error("Exception marking checkpoint as complete", e);
+ log.error("Exception marking checkpoint as complete", e);
dirsListener.allDisksFull();
}
}
@@ -197,13 +196,13 @@ class SyncThread {
// shutdown sync thread
void shutdown() throws InterruptedException {
- LOG.info("Shutting down SyncThread");
+ log.info("Shutting down SyncThread");
requestFlush();
executor.shutdown();
long start = MathUtils.now();
while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
long now = MathUtils.now();
- LOG.info("SyncThread taking a long time to shutdown. Has taken {}"
+ log.info("SyncThread taking a long time to shutdown. Has taken {}"
+ " seconds so far", now - start);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 17cc04f..08bb131 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -20,9 +20,14 @@
*/
package org.apache.bookkeeper.bookie;
+import static
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,7 +39,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
-
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -56,14 +60,11 @@ import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.TestUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
-
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
-import static org.junit.Assert.*;
/**
* This class tests the entry log compaction functionality.
*/
@@ -240,9 +241,14 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
Bookie.checkDirectoryStructure(dir);
}
InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
- storage.initialize(conf,
- LedgerManagerFactory.newLedgerManagerFactory(conf,
zkc).newLedgerManager(),
- dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
+ storage.initialize(
+ conf,
+ LedgerManagerFactory.newLedgerManagerFactory(conf,
zkc).newLedgerManager(),
+ dirManager,
+ dirManager,
+ cp,
+ Checkpointer.NULL,
+ NullStatsLogger.INSTANCE);
storage.start();
long startTime = MathUtils.now();
storage.gcThread.enableForceGC();
@@ -623,7 +629,14 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
assertFalse("Log shouldnt exist", log0.exists());
InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
- storage.initialize(conf, manager, dirs, dirs, checkpointSource,
NullStatsLogger.INSTANCE);
+ storage.initialize(
+ conf,
+ manager,
+ dirs,
+ dirs,
+ checkpointSource,
+ Checkpointer.NULL,
+ NullStatsLogger.INSTANCE);
ledgers.add(1L);
ledgers.add(2L);
ledgers.add(3L);
@@ -642,7 +655,13 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
ledgers.remove(3L);
storage = new InterleavedLedgerStorage();
- storage.initialize(conf, manager, dirs, dirs, checkpointSource,
NullStatsLogger.INSTANCE);
+ storage.initialize(
+ conf,
+ manager,
+ dirs, dirs,
+ checkpointSource,
+ Checkpointer.NULL,
+ NullStatsLogger.INSTANCE);
storage.start();
for (int i = 0; i < 10; i++) {
if (!log0.exists()) {
@@ -658,7 +677,14 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page
to flush
storage = new InterleavedLedgerStorage();
- storage.initialize(conf, manager, dirs, dirs, checkpointSource,
NullStatsLogger.INSTANCE);
+ storage.initialize(
+ conf,
+ manager,
+ dirs,
+ dirs,
+ checkpointSource,
+ Checkpointer.NULL,
+ NullStatsLogger.INSTANCE);
storage.getEntry(1, 1); // entry should exist
}
@@ -755,7 +781,14 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
}
};
InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
- storage.initialize(conf, manager, dirs, dirs, checkpointSource,
NullStatsLogger.INSTANCE);
+ storage.initialize(
+ conf,
+ manager,
+ dirs,
+ dirs,
+ checkpointSource,
+ Checkpointer.NULL,
+ NullStatsLogger.INSTANCE);
double threshold = 0.1;
// shouldn't throw exception
@@ -803,9 +836,14 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
Bookie.checkDirectoryStructure(dir);
}
InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
- storage.initialize(conf,
- LedgerManagerFactory.newLedgerManagerFactory(conf,
zkc).newLedgerManager(),
- dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
+ storage.initialize(
+ conf,
+ LedgerManagerFactory.newLedgerManagerFactory(conf,
zkc).newLedgerManager(),
+ dirManager,
+ dirManager,
+ cp,
+ Checkpointer.NULL,
+ NullStatsLogger.INSTANCE);
storage.start();
// test suspend Major GC.
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 5f62ecc..8de2bfc 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -475,10 +475,21 @@ public class LedgerCacheTest {
}
@Override
- public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager,
- LedgerDirsManager ledgerDirsManager, LedgerDirsManager
indexDirsManager,
- final CheckpointSource checkpointSource, StatsLogger
statsLogger) throws IOException {
- super.initialize(conf, ledgerManager, ledgerDirsManager,
indexDirsManager, checkpointSource, statsLogger);
+ public void initialize(ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger) throws IOException {
+ super.initialize(
+ conf,
+ ledgerManager,
+ ledgerDirsManager,
+ indexDirsManager,
+ checkpointSource,
+ checkpointer,
+ statsLogger);
this.memTable = new EntryMemTable(conf, checkpointSource,
statsLogger) {
@Override
boolean isSizeLimitReached() {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTestBase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTestBase.java
new file mode 100644
index 0000000..29268eb
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTestBase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Test the checkpoint logic in bookies.
+ */
+@Slf4j
+public abstract class LedgerStorageTestBase {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ protected ServerConfiguration conf;
+ protected File journalDir, ledgerDir;
+ protected LedgerDirsManager ledgerDirsManager;
+
+ private File createTempDir(String suffix) throws Exception {
+ File dir = File.createTempFile(testName.getMethodName(), suffix);
+ dir.delete();
+ dir.mkdirs();
+ return dir;
+ }
+
+ protected LedgerStorageTestBase() {
+ conf = TestBKConfiguration.newServerConfiguration();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ journalDir = createTempDir("journal");
+ ledgerDir = createTempDir("ledger");
+
+ // create current directories
+ Bookie.getCurrentDirectory(journalDir).mkdir();
+ Bookie.getCurrentDirectory(ledgerDir).mkdir();
+
+ // build the configuration
+ conf.setZkServers(null);
+ conf.setJournalDirName(journalDir.getPath());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ // build the ledger monitor
+ DiskChecker checker = new DiskChecker(
+ conf.getDiskUsageThreshold(),
+ conf.getDiskUsageWarnThreshold());
+ ledgerDirsManager = new LedgerDirsManager(
+ conf,
+ conf.getLedgerDirs(),
+ checker);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(journalDir);
+ FileUtils.deleteDirectory(ledgerDir);
+ }
+
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
new file mode 100644
index 0000000..c70b0e0
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link SortedLedgerStorage}.
+ */
+@Slf4j
+public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
+
+ @Data
+ @RequiredArgsConstructor
+ @ToString
+ @EqualsAndHashCode
+ private static class TestCheckpoint implements Checkpoint {
+
+ private final long offset;
+
+ @Override
+ public int compareTo(Checkpoint o) {
+ if (Checkpoint.MAX == o) {
+ return -1;
+ }
+
+ TestCheckpoint other = (TestCheckpoint) o;
+ return Long.compare(offset, other.offset);
+ }
+
+ }
+
+ @RequiredArgsConstructor
+ private static class TestCheckpointSource implements CheckpointSource {
+
+ private long currentOffset = 0;
+
+ void advanceOffset(long numBytes) {
+ currentOffset += numBytes;
+ }
+
+ @Override
+ public Checkpoint newCheckpoint() {
+ TestCheckpoint cp = new TestCheckpoint(currentOffset);
+ log.info("New checkpoint : {}", cp);
+ return cp;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+ throws IOException {
+ log.info("Complete checkpoint : {}", checkpoint);
+ }
+ }
+
+ private SortedLedgerStorage storage;
+ private Checkpointer checkpointer;
+ private final LinkedBlockingQueue<Checkpoint> checkpoints;
+ private final TestCheckpointSource checkpointSrc = new
TestCheckpointSource();
+
+ public SortedLedgerStorageCheckpointTest() {
+ super();
+ conf.setEntryLogSizeLimit(1);
+ this.checkpoints = new LinkedBlockingQueue<>();
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ // initial checkpoint
+
+ this.storage = new SortedLedgerStorage();
+ this.checkpointer = checkpoint -> storage.getScheduler().submit(() -> {
+ log.info("Checkpoint the storage at {}", checkpoint);
+ try {
+ storage.checkpoint(checkpoint);
+ checkpoints.add(checkpoint);
+ } catch (IOException e) {
+ log.error("Failed to checkpoint at {}", checkpoint, e);
+ }
+ });
+ this.storage.initialize(
+ conf,
+ mock(LedgerManager.class),
+ ledgerDirsManager,
+ ledgerDirsManager,
+ checkpointSrc,
+ checkpointer,
+ NullStatsLogger.INSTANCE);
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ if (null != storage) {
+ storage.shutdown();
+ }
+ super.tearDown();
+ }
+
+ ByteBuf prepareEntry(long ledgerId, long entryId) {
+ ByteBuf entry = Unpooled.buffer(4 * Long.BYTES);
+ // ledger id, entry id, lac
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId);
+ entry.writeLong(entryId - 1);
+ // data
+ entry.writeLong(entryId);
+ return entry;
+ }
+
+ @Test
+ public void testCheckpoint() throws Exception {
+ // memory table holds the first checkpoint, but it is not completed
yet.
+ Checkpoint memtableCp = storage.memTable.kvmap.cp;
+ assertEquals(new TestCheckpoint(0), memtableCp);
+
+ // write entries into ledger storage
+ long lid = System.currentTimeMillis();
+ storage.setMasterKey(lid, new byte[0]);
+ for (int i = 0; i < 20; i++) {
+ storage.addEntry(prepareEntry(lid, i));
+ }
+ // simulate journal persists the entries in journal;
+ checkpointSrc.advanceOffset(100);
+
+ // memory table holds the first checkpoint, but it is not completed
yet.
+ memtableCp = storage.memTable.kvmap.cp;
+ assertEquals(new TestCheckpoint(0), memtableCp);
+
+ // trigger a memtable flush
+ storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
+ // wait for checkpoint to complete
+ checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp);
+ assertEquals(0, storage.memTable.kvmap.size());
+ }
+
+ @Test
+ public void testCheckpointAfterEntryLogRotated() throws Exception {
+ // memory table holds the first checkpoint, but it is not completed
yet.
+ Checkpoint memtableCp = storage.memTable.kvmap.cp;
+ assertEquals(new TestCheckpoint(0), memtableCp);
+
+ // write entries into ledger storage
+ long lid = System.currentTimeMillis();
+ storage.setMasterKey(lid, new byte[0]);
+ for (int i = 0; i < 20; i++) {
+ storage.addEntry(prepareEntry(lid, i));
+ }
+ // simulate journal persists the entries in journal;
+ checkpointSrc.advanceOffset(100);
+
+ // memory table holds the first checkpoint, but it is not completed
yet.
+ memtableCp = storage.memTable.kvmap.cp;
+ assertEquals(new TestCheckpoint(0), memtableCp);
+ assertEquals(20, storage.memTable.kvmap.size());
+
+ final CountDownLatch readyLatch = new CountDownLatch(1);
+ storage.getScheduler().submit(() -> {
+ try {
+ readyLatch.await();
+ } catch (InterruptedException e) {
+ }
+ });
+
+ // simulate entry log is rotated (due to compaction)
+ storage.entryLogger.rollLog();
+ long leastUnflushedLogId =
storage.entryLogger.getLeastUnflushedLogId();
+ long currentLogId = storage.entryLogger.getCurrentLogId();
+ log.info("Least unflushed entry log : current = {}, leastUnflushed =
{}", currentLogId, leastUnflushedLogId);
+
+ readyLatch.countDown();
+ assertNull(checkpoints.poll());
+ assertEquals(new TestCheckpoint(0), storage.memTable.kvmap.cp);
+ assertEquals(20, storage.memTable.kvmap.size());
+
+ // trigger a memtable flush
+ storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
+ assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE,
TimeUnit.MILLISECONDS));
+
+ // all the entries are flushed out
+ assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp);
+ assertEquals(0, storage.memTable.kvmap.size());
+ assertTrue(
+ "current log " + currentLogId + " contains entries added from
memtable should be forced to disk"
+ + " but least unflushed log is " +
storage.entryLogger.getLeastUnflushedLogId(),
+ storage.entryLogger.getLeastUnflushedLogId() > currentLogId);
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 0ca108e..ec60799 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -18,20 +18,21 @@
*/
package org.apache.bookkeeper.bookie;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;
-
+import java.util.Random;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Test;
import org.junit.Before;
-import java.nio.ByteBuffer;
-import java.util.Random;
public class TestEntryMemTable implements CacheCallback, SkipListFlusher,
CheckpointSource {
@@ -92,7 +93,7 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
}
@Override
- public void onSizeLimitReached() throws IOException {
+ public void onSizeLimitReached(Checkpoint cp) throws IOException {
// No-op
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 0d8f8a4..5f30557 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
-
import java.io.File;
import java.io.IOException;
import java.util.Observable;
@@ -34,7 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -101,7 +99,7 @@ public class TestSyncThread {
}
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint)
+ public void checkpoint(Checkpoint checkpoint)
throws IOException {
checkpointCalledLatch.countDown();
try {
@@ -111,27 +109,24 @@ public class TestSyncThread {
LOG.error("Interrupted in checkpoint thread", ie);
failedSomewhere.set(true);
}
- return checkpoint;
}
};
final SyncThread t = new SyncThread(conf, listener, storage,
checkpointSource);
- t.start();
+ t.startCheckpoint(Checkpoint.MAX);
assertTrue("Checkpoint should have been called",
checkpointCalledLatch.await(10, TimeUnit.SECONDS));
- Future<Boolean> done = executor.submit(new Callable<Boolean>() {
- public Boolean call() {
- try {
- t.shutdown();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.error("Interrupted shutting down sync thread", ie);
- failedSomewhere.set(true);
- return false;
- }
- return true;
- }
- });
+ Future<Boolean> done = executor.submit(() -> {
+ try {
+ t.shutdown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted shutting down sync thread", ie);
+ failedSomewhere.set(true);
+ return false;
+ }
+ return true;
+ });
checkpointLatch.countDown();
assertFalse("Shutdown shouldn't have finished", done.isDone());
assertTrue("Flush should have been called",
@@ -160,14 +155,13 @@ public class TestSyncThread {
final AtomicInteger checkpointCount = new AtomicInteger(0);
LedgerStorage storage = new DummyLedgerStorage() {
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint)
+ public void checkpoint(Checkpoint checkpoint)
throws IOException {
checkpointCount.incrementAndGet();
- return checkpoint;
}
};
final SyncThread t = new SyncThread(conf, listener, storage,
checkpointSource);
- t.start();
+ t.startCheckpoint(Checkpoint.MAX);
while (checkpointCount.get() == 0) {
Thread.sleep(flushInterval);
}
@@ -175,6 +169,7 @@ public class TestSyncThread {
Thread.sleep(flushInterval);
int count = checkpointCount.get();
for (int i = 0; i < 10; i++) {
+ t.startCheckpoint(Checkpoint.MAX);
assertEquals("Checkpoint count shouldn't change", count,
checkpointCount.get());
}
t.resumeSync();
@@ -210,13 +205,13 @@ public class TestSyncThread {
LedgerStorage storage = new DummyLedgerStorage() {
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint)
+ public void checkpoint(Checkpoint checkpoint)
throws IOException {
throw new RuntimeException("Fatal error in sync thread");
}
};
final SyncThread t = new SyncThread(conf, listener, storage,
checkpointSource);
- t.start();
+ t.startCheckpoint(Checkpoint.MAX);
assertTrue("Should have called fatal error", fatalLatch.await(10,
TimeUnit.SECONDS));
t.shutdown();
}
@@ -242,13 +237,13 @@ public class TestSyncThread {
LedgerStorage storage = new DummyLedgerStorage() {
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint)
+ public void checkpoint(Checkpoint checkpoint)
throws IOException {
throw new NoWritableLedgerDirException("Disk full error in
sync thread");
}
};
final SyncThread t = new SyncThread(conf, listener, storage,
checkpointSource);
- t.start();
+ t.startCheckpoint(Checkpoint.MAX);
assertTrue("Should have disk full error", diskFullLatch.await(10,
TimeUnit.SECONDS));
t.shutdown();
}
@@ -267,9 +262,14 @@ public class TestSyncThread {
private static class DummyLedgerStorage implements LedgerStorage {
@Override
- public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager,
- LedgerDirsManager ledgerDirsManager, LedgerDirsManager
indexDirsManager,
- CheckpointSource checkpointSource, StatsLogger statsLogger)
+ public void initialize(
+ ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger)
throws IOException {
}
@@ -346,9 +346,8 @@ public class TestSyncThread {
}
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint)
+ public void checkpoint(Checkpoint checkpoint)
throws IOException {
- return checkpoint;
}
@Override
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index ecdfa77..ad21fc1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,10 +47,10 @@ import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
@@ -320,9 +319,14 @@ public class GcLedgersTest extends LedgerManagerTestCase {
class MockLedgerStorage implements CompactableLedgerStorage {
@Override
- public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager,
- LedgerDirsManager ledgerDirsManager, LedgerDirsManager
indexDirsManager,
- CheckpointSource checkpointSource, StatsLogger statsLogger)
throws IOException {
+ public void initialize(
+ ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger) throws IOException {
}
@Override
@@ -386,8 +390,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
}
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException
{
- return null;
+ public void checkpoint(Checkpoint checkpoint) throws IOException {
}
@Override
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index c6f2a36..5d357c3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -32,6 +32,7 @@ import java.util.Observer;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
@@ -112,9 +113,14 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
public class MockLedgerStorage implements CompactableLedgerStorage {
@Override
- public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager,
- LedgerDirsManager ledgerDirsManager, LedgerDirsManager
indexDirsManager,
- CheckpointSource checkpointSource, StatsLogger statsLogger)
throws IOException {
+ public void initialize(
+ ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ StatsLogger statsLogger) throws IOException {
}
@Override
@@ -169,8 +175,7 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
}
@Override
- public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException
{
- return null;
+ public void checkpoint(Checkpoint checkpoint) throws IOException {
}
@Override
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].