This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6a5f636 ISSUE280: Add StateManager to manage Bookie's state, and use
it to turn bookie into readonly when sortedLedgerStorage failed to flush data
6a5f636 is described below
commit 6a5f6367a77094406ff0c938cf6b7f6d804451e1
Author: Arvin <[email protected]>
AuthorDate: Mon Jan 8 15:56:40 2018 -0800
ISSUE280: Add StateManager to manage Bookie's state, and use it to turn
bookie into readonly when sortedLedgerStorage failed to flush data
Descriptions of the changes in this PR:
Add SortedLedgerStorageListener to implement, and add a test case to verify
it.
Master Issue: #280
Author: Arvin <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>, Ivan Kelly <[email protected]>
This closes #873 from ArvinDevel/issue280
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 258 +++--------------
.../org/apache/bookkeeper/bookie/BookieShell.java | 8 +-
.../bookkeeper/bookie/BookieStateManager.java | 310 +++++++++++++++++++++
.../bookie/InterleavedLedgerStorage.java | 1 +
.../apache/bookkeeper/bookie/LedgerStorage.java | 1 +
.../apache/bookkeeper/bookie/ReadOnlyBookie.java | 29 +-
.../bookkeeper/bookie/SortedLedgerStorage.java | 12 +-
.../org/apache/bookkeeper/bookie/StateManager.java | 97 +++++++
.../bookie/storage/ldb/DbLedgerStorage.java | 5 +-
.../bookie/BookieInitializationTest.java | 33 +--
.../apache/bookkeeper/bookie/CompactionTest.java | 11 +-
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 66 ++++-
.../bookie/SortedLedgerStorageCheckpointTest.java | 2 +
.../apache/bookkeeper/bookie/StateManagerTest.java | 258 +++++++++++++++++
.../apache/bookkeeper/bookie/TestSyncThread.java | 1 +
.../bookie/storage/ldb/ConversionRollbackTest.java | 6 +-
.../bookie/storage/ldb/ConversionTest.java | 12 +-
.../storage/ldb/LocationsIndexRebuildTest.java | 4 +-
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 2 +
.../bookkeeper/meta/LedgerManagerTestCase.java | 2 +
.../replication/AuditorLedgerCheckerTest.java | 6 +-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 2 +-
.../apache/bookkeeper/test/ReadOnlyBookieTest.java | 4 +-
23 files changed, 844 insertions(+), 286 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index aaabdb5..0b57d56 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -30,13 +30,11 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
@@ -55,11 +53,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -82,7 +76,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -133,24 +126,12 @@ public class Bookie extends BookieCriticalThread {
// Registration Manager for managing registration
RegistrationManager registrationManager;
- // Running flag
- private volatile boolean running = false;
- // Flag identify whether it is in shutting down progress
- private volatile boolean shuttingdown = false;
- // Bookie status
- private final BookieStatus bookieStatus = new BookieStatus();
private int exitCode = ExitCode.OK;
private final ConcurrentLongHashMap<byte[]> masterKeyCache = new
ConcurrentLongHashMap<>();
- protected final String bookieId;
-
- private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
- protected final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
- // executor to manage the state changes for a bookie.
- final ExecutorService stateService = Executors.newSingleThreadExecutor(
- new
ThreadFactoryBuilder().setNameFormat("BookieStateService-%d").build());
+ protected StateManager stateManager;
// Expose Stats
private final StatsLogger statsLogger;
@@ -246,12 +227,13 @@ public class Bookie extends BookieCriticalThread {
}
@VisibleForTesting
- public void setRegistrationManager(RegistrationManager rm) {
- this.registrationManager = rm;
+ public synchronized void setRegistrationManager(RegistrationManager rm) {
+ this.registrationManager = rm;
+ this.getStateManager().setRegistrationManager(rm);
}
@VisibleForTesting
- public RegistrationManager getRegistrationManager() {
+ public synchronized RegistrationManager getRegistrationManager() {
return this.registrationManager;
}
@@ -667,7 +649,9 @@ public class Bookie extends BookieCriticalThread {
} catch (KeeperException e) {
throw new MetadataStoreException("Failed to initialize ledger
manager", e);
}
-
+ stateManager = new BookieStateManager(conf, statsLogger,
registrationManager, ledgerDirsManager);
+ // register shutdown handler using trigger mode
+ stateManager.setShutdownHandler(exitCode ->
triggerBookieShutdown(exitCode));
// Initialise ledgerDirMonitor. This would look through all the
// configured directories. When disk errors or all the ledger
// directories are full, would throws exception and fail bookie
startup.
@@ -679,7 +663,7 @@ public class Bookie extends BookieCriticalThread {
if (!conf.isReadOnlyModeEnabled()) {
throw nle;
} else {
- this.transitionToReadOnlyMode();
+ this.stateManager.transitionToReadOnlyMode();
}
}
@@ -694,13 +678,11 @@ public class Bookie extends BookieCriticalThread {
if (!conf.isReadOnlyModeEnabled()) {
throw nle;
} else {
- this.transitionToReadOnlyMode();
+ this.stateManager.transitionToReadOnlyMode();
}
}
}
- // ZK ephemeral node for this Bookie.
- this.bookieId = getMyId();
// instantiate the journals
journals = Lists.newArrayList();
@@ -722,6 +704,7 @@ public class Bookie extends BookieCriticalThread {
ledgerManager,
ledgerDirsManager,
indexDirsManager,
+ stateManager,
checkpointSource,
syncThread,
statsLogger);
@@ -737,28 +720,6 @@ public class Bookie extends BookieCriticalThread {
readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
- // 1 : up, 0 : readonly, -1 : unregistered
- statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- if (!rmRegistered.get()){
- return -1;
- } else if (forceReadOnly.get() ||
bookieStatus.isInReadOnlyMode()) {
- return 0;
- } else {
- return 1;
- }
- }
- });
- }
-
- private String getMyId() throws UnknownHostException {
- return Bookie.getBookieAddress(conf).toString();
}
void readJournal() throws IOException, BookieException {
@@ -877,19 +838,13 @@ public class Bookie extends BookieCriticalThread {
ledgerStorage.start();
- // check the bookie status to start with
- if (forceReadOnly.get()) {
- this.bookieStatus.setToReadOnlyMode();
- } else if (conf.isPersistBookieStatusEnabled()) {
-
this.bookieStatus.readFromDirectories(ledgerDirsManager.getAllLedgerDirs());
- }
-
- // set running here.
+ // check the bookie status to start with, and set running.
// since bookie server use running as a flag to tell bookie server
whether it is alive
// if setting it in bookie thread, the watcher might run before bookie
thread.
- running = true;
+ stateManager.initState();
+
try {
- registerBookie(true).get();
+ stateManager.registerBookie(true).get();
} catch (Exception e) {
LOG.error("Couldn't register bookie with zookeeper, shutting down
: ", e);
shutdown(ExitCode.ZK_REG_FAIL);
@@ -922,7 +877,7 @@ public class Bookie extends BookieCriticalThread {
@Override
public void allDisksFull() {
// Transition to readOnly mode on all disks full
- transitionToReadOnlyMode();
+ stateManager.transitionToReadOnlyMode();
}
@Override
@@ -934,13 +889,13 @@ public class Bookie extends BookieCriticalThread {
@Override
public void diskWritable(File disk) {
// Transition to writable mode when a disk becomes writable
again.
- transitionToWritableMode();
+ stateManager.transitionToWritableMode();
}
@Override
public void diskJustWritable(File disk) {
// Transition to writable mode when a disk becomes writable
again.
- transitionToWritableMode();
+ stateManager.transitionToWritableMode();
}
};
}
@@ -959,163 +914,21 @@ public class Bookie extends BookieCriticalThread {
RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
return manager.initialize(conf, () -> {
- rmRegistered.set(false);
+ stateManager.forceToUnregistered();
// schedule a re-register operation
- registerBookie(false);
+ stateManager.registerBookie(false);
}, statsLogger);
}
- /**
- * Register as an available bookie.
- */
- protected Future<Void> registerBookie(final boolean throwException) {
- return stateService.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- try {
- doRegisterBookie();
- } catch (IOException ioe) {
- if (throwException) {
- throw ioe;
- } else {
- LOG.error("Couldn't register bookie with zookeeper,
shutting down : ", ioe);
- triggerBookieShutdown(ExitCode.ZK_REG_FAIL);
- }
- }
- return (Void) null;
- }
- });
- }
-
- protected void doRegisterBookie() throws IOException {
- doRegisterBookie(forceReadOnly.get() ||
bookieStatus.isInReadOnlyMode());
- }
-
- private void doRegisterBookie(boolean isReadOnly) throws IOException {
- if (null == registrationManager || ((ZKRegistrationManager)
this.registrationManager).getZk() == null) {
- // registration manager is null, means not register itself to zk.
- // ZooKeeper is null existing only for testing.
- LOG.info("null zk while do register");
- return;
- }
-
- rmRegistered.set(false);
- try {
- registrationManager.registerBookie(bookieId, isReadOnly);
- rmRegistered.set(true);
- } catch (BookieException e) {
- throw new IOException(e);
- }
- }
-
-
- /**
- * Transition the bookie from readOnly mode to writable.
- */
- private Future<Void> transitionToWritableMode() {
- return stateService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- doTransitionToWritableMode();
- return null;
- }
- });
- }
-
- @VisibleForTesting
- public void doTransitionToWritableMode() {
- if (shuttingdown || forceReadOnly.get()) {
- return;
- }
-
- if (!bookieStatus.setToWritableMode()) {
- // do nothing if already in writable mode
- return;
- }
- LOG.info("Transitioning Bookie to Writable mode and will serve
read/write requests.");
- if (conf.isPersistBookieStatusEnabled()) {
-
bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
- }
- // change zookeeper state only when using zookeeper
- if (null == registrationManager) {
- return;
- }
- try {
- doRegisterBookie(false);
- } catch (IOException e) {
- LOG.warn("Error in transitioning back to writable mode : ", e);
- transitionToReadOnlyMode();
- return;
- }
- // clear the readonly state
- try {
- registrationManager.unregisterBookie(bookieId, true);
- } catch (BookieException e) {
- // if we failed when deleting the readonly flag in zookeeper, it
is OK since client would
- // already see the bookie in writable list. so just log the
exception
- LOG.warn("Failed to delete bookie readonly state in zookeeper : ",
e);
- return;
- }
- }
-
- /**
- * Transition the bookie to readOnly mode.
- */
- private Future<Void> transitionToReadOnlyMode() {
- return stateService.submit(new Callable<Void>() {
- @Override
- public Void call() {
- doTransitionToReadOnlyMode();
- return (Void) null;
- }
- });
- }
-
- @VisibleForTesting
- public void doTransitionToReadOnlyMode() {
- if (shuttingdown) {
- return;
- }
- if (!bookieStatus.setToReadOnlyMode()) {
- return;
- }
- if (!conf.isReadOnlyModeEnabled()) {
- LOG.warn("ReadOnly mode is not enabled. "
- + "Can be enabled by configuring "
- + "'readOnlyModeEnabled=true' in configuration."
- + "Shutting down bookie");
- triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
- return;
- }
- LOG.info("Transitioning Bookie to ReadOnly mode,"
- + " and will serve only read requests from clients!");
- // persist the bookie status if we enable this
- if (conf.isPersistBookieStatusEnabled()) {
-
this.bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
- }
- // change zookeeper state only when using zookeeper
- if (null == registrationManager) {
- return;
- }
- try {
- registrationManager.registerBookie(bookieId, true);
- } catch (BookieException e) {
- LOG.error("Error in transition to ReadOnly Mode."
- + " Shutting down", e);
- triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
- return;
- }
- }
-
/*
- * Check whether Bookie is writable
+ * Check whether Bookie is writable.
*/
public boolean isReadOnly() {
- return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
+ return stateManager.isReadOnly();
}
public boolean isRunning() {
- return running;
+ return stateManager.isRunning();
}
@Override
@@ -1137,7 +950,7 @@ public class Bookie extends BookieCriticalThread {
LOG.warn("Interrupted on running journal thread : ", ie);
}
// if the journal thread quits due to shutting down, it is ok
- if (!shuttingdown) {
+ if (!stateManager.isShuttingDown()) {
// some error found in journal thread and it quits
// following add operations to it would hang unit client timeout
// so we should let bookie server exists
@@ -1175,19 +988,19 @@ public class Bookie extends BookieCriticalThread {
// when encountering exception
synchronized int shutdown(int exitCode) {
try {
- if (running) { // avoid shutdown twice
+ if (isRunning()) { // avoid shutdown twice
// the exitCode only set when first shutdown usually due to
exception found
LOG.info("Shutting down Bookie-{} with exitCode {}",
conf.getBookiePort(), exitCode);
if (this.exitCode == ExitCode.OK) {
this.exitCode = exitCode;
}
- // mark bookie as in shutting down progress
- shuttingdown = true;
+
+ stateManager.forceToShuttingDown();
// turn bookie to read only during shutting down process
LOG.info("Turning bookie to read only during shut down");
- this.forceReadOnly.set(true);
+ stateManager.forceToReadOnly();
// Shutdown Sync thread
syncThread.shutdown();
@@ -1219,8 +1032,6 @@ public class Bookie extends BookieCriticalThread {
idxMonitor.shutdown();
}
- // Shutdown the state service
- stateService.shutdown();
}
// Shutdown the ZK client
if (registrationManager != null) {
@@ -1231,7 +1042,7 @@ public class Bookie extends BookieCriticalThread {
} finally {
// setting running to false here, so watch thread
// in bookie server know it only after bookie shut down
- running = false;
+ stateManager.close();
}
return this.exitCode;
}
@@ -1306,7 +1117,7 @@ public class Bookie extends BookieCriticalThread {
}
success = true;
} catch (NoWritableLedgerDirException e) {
- transitionToReadOnlyMode();
+ stateManager.transitionToReadOnlyMode();
throw new IOException(e);
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
@@ -1331,7 +1142,7 @@ public class Bookie extends BookieCriticalThread {
handle.setExplicitLac(entry);
}
} catch (NoWritableLedgerDirException e) {
- transitionToReadOnlyMode();
+ stateManager.transitionToReadOnlyMode();
throw new IOException(e);
}
}
@@ -1366,7 +1177,7 @@ public class Bookie extends BookieCriticalThread {
}
success = true;
} catch (NoWritableLedgerDirException e) {
- transitionToReadOnlyMode();
+ stateManager.transitionToReadOnlyMode();
throw new IOException(e);
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
@@ -1458,6 +1269,11 @@ public class Bookie extends BookieCriticalThread {
return ledgerStorage;
}
+ @VisibleForTesting
+ public BookieStateManager getStateManager() {
+ return (BookieStateManager) this.stateManager;
+ }
+
// The rest of the code is test stuff
static class CounterCallback implements WriteCallback {
int count;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index bfc24fc..780852a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2331,8 +2331,8 @@ public class BookieShell implements Tool {
};
interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
- checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
- dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
+ dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager, null,
checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
int convertedLedgers = 0;
@@ -2420,10 +2420,10 @@ public class BookieShell implements Tool {
}
};
- dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
+ dbStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager, null,
checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerIndexManager,
- checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
LedgerCache interleavedLedgerCache =
interleavedStorage.ledgerCache;
EntryLocationIndex dbEntryLocationIndex =
dbStorage.getEntryLocationIndex();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
new file mode 100644
index 0000000..093dcad
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -0,0 +1,310 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of StateManager.
+ */
+public class BookieStateManager implements StateManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BookieStateManager.class);
+ private final ServerConfiguration conf;
+ private final LedgerDirsManager ledgerDirsManager;
+
+
+ // use an executor to execute the state changes task
+ final ExecutorService stateService = Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("BookieStateManagerService-%d").build());
+
+ // Running flag
+ private volatile boolean running = false;
+ // Flag identify whether it is in shutting down progress
+ private volatile boolean shuttingdown = false;
+ // Bookie status
+ private final BookieStatus bookieStatus = new BookieStatus();
+ private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
+ private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
+
+ private final String bookieId;
+ private ShutdownHandler shutdownHandler;
+ private RegistrationManager registrationManager;
+ // Expose Stats
+ private final StatsLogger statsLogger;
+
+
+ public BookieStateManager(ServerConfiguration conf, StatsLogger
statsLogger,
+ RegistrationManager registrationManager, LedgerDirsManager
ledgerDirsManager) throws IOException {
+ this.conf = conf;
+ this.statsLogger = statsLogger;
+ this.registrationManager = registrationManager;
+ this.ledgerDirsManager = ledgerDirsManager;
+ // ZK ephemeral node for this Bookie.
+ this.bookieId = getMyId();
+ // 1 : up, 0 : readonly, -1 : unregistered
+ statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ if (!rmRegistered.get()){
+ return -1;
+ } else if (forceReadOnly.get() ||
bookieStatus.isInReadOnlyMode()) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ });
+ }
+
+ @VisibleForTesting
+ BookieStateManager(ServerConfiguration conf, RegistrationManager
registrationManager) throws IOException {
+ this(conf, NullStatsLogger.INSTANCE, registrationManager, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()),
+ NullStatsLogger.INSTANCE));
+ }
+
+ @Override
+ public void initState(){
+ if (forceReadOnly.get()) {
+ this.bookieStatus.setToReadOnlyMode();
+ } else if (conf.isPersistBookieStatusEnabled()) {
+
this.bookieStatus.readFromDirectories(ledgerDirsManager.getAllLedgerDirs());
+ }
+ running = true;
+ }
+
+ @Override
+ public void forceToShuttingDown(){
+ // mark bookie as in shutting down progress
+ shuttingdown = true;
+ }
+
+ @Override
+ public void forceToReadOnly(){
+ this.forceReadOnly.set(true);
+ }
+
+ @Override
+ public void forceToUnregistered(){
+ this.rmRegistered.set(false);
+ }
+
+ @Override
+ public boolean isReadOnly(){
+ return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
+ }
+
+ @Override
+ public boolean isRunning(){
+ return running;
+ }
+
+ @Override
+ public boolean isShuttingDown(){
+ return shuttingdown;
+ }
+
+ @Override
+ public void close() {
+ this.running = false;
+ stateService.shutdown();
+ }
+
+ @Override
+ public Future<Void> registerBookie(final boolean throwException) {
+ return stateService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ try {
+ doRegisterBookie();
+ } catch (IOException ioe) {
+ if (throwException) {
+ throw ioe;
+ } else {
+ LOG.error("Couldn't register bookie with zookeeper,
shutting down : ", ioe);
+ shutdownHandler.shutdown(ExitCode.ZK_REG_FAIL);
+ }
+ }
+ return (Void) null;
+ }
+ });
+ }
+
+ @Override
+ public Future<Void> transitionToWritableMode() {
+ return stateService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception{
+ doTransitionToWritableMode();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public Future<Void> transitionToReadOnlyMode() {
+ return stateService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception{
+ doTransitionToReadOnlyMode();
+ return null;
+ }
+ });
+ }
+
+ void doRegisterBookie() throws IOException {
+ doRegisterBookie(forceReadOnly.get() ||
bookieStatus.isInReadOnlyMode());
+ }
+
+ private void doRegisterBookie(boolean isReadOnly) throws IOException {
+ if (null == registrationManager || ((ZKRegistrationManager)
this.registrationManager).getZk() == null) {
+ // registration manager is null, means not register itself to zk.
+ // ZooKeeper is null existing only for testing.
+ LOG.info("null zk while do register");
+ return;
+ }
+
+ rmRegistered.set(false);
+ try {
+ registrationManager.registerBookie(bookieId, isReadOnly);
+ rmRegistered.set(true);
+ } catch (BookieException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @VisibleForTesting
+ public void doTransitionToWritableMode() {
+ if (shuttingdown || forceReadOnly.get()) {
+ return;
+ }
+
+ if (!bookieStatus.setToWritableMode()) {
+ // do nothing if already in writable mode
+ return;
+ }
+ LOG.info("Transitioning Bookie to Writable mode and will serve
read/write requests.");
+ if (conf.isPersistBookieStatusEnabled()) {
+
bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
+ }
+ // change zookeeper state only when using zookeeper
+ if (null == registrationManager) {
+ return;
+ }
+ try {
+ doRegisterBookie(false);
+ } catch (IOException e) {
+ LOG.warn("Error in transitioning back to writable mode : ", e);
+ transitionToReadOnlyMode();
+ return;
+ }
+ // clear the readonly state
+ try {
+ registrationManager.unregisterBookie(bookieId, true);
+ } catch (BookieException e) {
+ // if we failed when deleting the readonly flag in zookeeper, it
is OK since client would
+ // already see the bookie in writable list. so just log the
exception
+ LOG.warn("Failed to delete bookie readonly state in zookeeper : ",
e);
+ return;
+ }
+ }
+
+ @VisibleForTesting
+ public void doTransitionToReadOnlyMode() {
+ if (shuttingdown) {
+ return;
+ }
+ if (!bookieStatus.setToReadOnlyMode()) {
+ return;
+ }
+ if (!conf.isReadOnlyModeEnabled()) {
+ LOG.warn("ReadOnly mode is not enabled. "
+ + "Can be enabled by configuring "
+ + "'readOnlyModeEnabled=true' in configuration."
+ + " Shutting down bookie");
+ shutdownHandler.shutdown(ExitCode.BOOKIE_EXCEPTION);
+ return;
+ }
+ LOG.info("Transitioning Bookie to ReadOnly mode,"
+ + " and will serve only read requests from clients!");
+ // persist the bookie status if we enable this
+ if (conf.isPersistBookieStatusEnabled()) {
+
this.bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
+ }
+ // change zookeeper state only when using zookeeper
+ if (null == registrationManager) {
+ return;
+ }
+ try {
+ registrationManager.registerBookie(bookieId, true);
+ } catch (BookieException e) {
+ LOG.error("Error in transition to ReadOnly Mode."
+ + " Shutting down", e);
+ shutdownHandler.shutdown(ExitCode.BOOKIE_EXCEPTION);
+ return;
+ }
+ }
+ public void setShutdownHandler(ShutdownHandler handler){
+ shutdownHandler = handler;
+ }
+
+ private String getMyId() throws UnknownHostException {
+ return Bookie.getBookieAddress(conf).toString();
+ }
+
+ @VisibleForTesting
+ public void setRegistrationManager(RegistrationManager rm) {
+ this.registrationManager = rm;
+ }
+ @VisibleForTesting
+ public ShutdownHandler getShutdownHandler(){
+ return shutdownHandler;
+ }
+ @VisibleForTesting
+ boolean isRegistered(){
+ return rmRegistered.get();
+ }
+}
+
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index bcb5dc5..5d5d74e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -90,6 +90,7 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index ac0df00..83ac2c0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -45,6 +45,7 @@ public interface LedgerStorage {
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 3970a1d..464f127 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -22,7 +22,6 @@
package org.apache.bookkeeper.bookie;
import java.io.IOException;
-
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.KeeperException;
@@ -42,8 +41,22 @@ public class ReadOnlyBookie extends Bookie {
public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException,
BookieException {
super(conf, statsLogger);
+ stateManager = new BookieStateManager(conf, statsLogger,
registrationManager, getLedgerDirsManager()) {
+
+ @Override
+ public void doTransitionToWritableMode() {
+ // no-op
+ LOG.info("Skip transition to writable mode for readonly
bookie");
+ }
+
+ @Override
+ public void doTransitionToReadOnlyMode() {
+ // no-op
+ LOG.info("Skip transition to readonly mode for readonly
bookie");
+ }
+ };
if (conf.isReadOnlyModeEnabled()) {
- forceReadOnly.set(true);
+ stateManager.forceToReadOnly();
} else {
String err = "Try to init ReadOnly Bookie, while ReadOnly mode is
not enabled";
LOG.error(err);
@@ -51,16 +64,4 @@ public class ReadOnlyBookie extends Bookie {
}
LOG.info("Running bookie in force readonly mode.");
}
-
- @Override
- public void doTransitionToWritableMode() {
- // no-op
- LOG.info("Skip transition to writable mode for readonly bookie");
- }
-
- @Override
- public void doTransitionToReadOnlyMode() {
- // no-op
- LOG.info("Skip transition to readonly mode for readonly bookie");
- }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index b0c6bad..0e3e3b9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -47,6 +47,7 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
EntryMemTable memTable;
private ScheduledExecutorService scheduler;
+ private StateManager stateManager;
public SortedLedgerStorage() {
super();
@@ -57,6 +58,7 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger)
@@ -66,6 +68,7 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
ledgerManager,
ledgerDirsManager,
indexDirsManager,
+ stateManager,
checkpointSource,
checkpointer,
statsLogger);
@@ -74,6 +77,7 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
new ThreadFactoryBuilder()
.setNameFormat("SortedLedgerStorage-%d")
.setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) /
2).build());
+ this.stateManager = stateManager;
}
@VisibleForTesting
@@ -218,8 +222,7 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
checkpointer.startCheckpoint(cp);
}
} catch (IOException e) {
- // TODO: if we failed to flush data, we should switch the
bookie back to readonly mode
- // or shutdown it. {@link
https://github.com/apache/bookkeeper/issues/280}
+ stateManager.transitionToReadOnlyMode();
LOG.error("Exception thrown while flushing skip list
cache.", e);
}
}
@@ -233,4 +236,9 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
// can happen because compaction. in a sorted ledger storage,
checkpoint should happen after the data is
// flushed to the entry log file.
}
+
+ BookieStateManager getStateManager(){
+ return (BookieStateManager) stateManager;
+ }
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
new file mode 100644
index 0000000..ad4ac0c
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.concurrent.Future;
+
+/**
+ * State management of Bookie, including register, turn bookie to w/r mode.
+ */
+public interface StateManager extends AutoCloseable {
+
+
+ /**
+ * Init state of Bookie when launch bookie.
+ */
+ void initState();
+
+ /**
+ * Check is ReadOnly.
+ */
+ boolean isReadOnly();
+
+ /**
+ * Check is Running.
+ */
+ boolean isRunning();
+
+ /**
+ * Check is Shutting down.
+ */
+ boolean isShuttingDown();
+
+ /**
+ * Close the manager, release its resources.
+ */
+ @Override
+ void close();
+
+ /**
+ * Register the bookie to RegistrationManager.
+ * @params throwException, whether throwException or not
+ */
+ Future<Void> registerBookie(boolean throwException);
+
+ // forceTos methods below should be called inside Bookie,
+ // which indicates important state of bookie and should be visible fast.
+ /**
+ * Turn state to the shutting down progress,just the flag.
+ */
+ void forceToShuttingDown();
+
+ /**
+ * Turn state to the read only, just flag.
+ */
+ void forceToReadOnly();
+
+ /**
+ * Turn state to not registered, just the flag.
+ */
+ void forceToUnregistered();
+
+ /**
+ * Change the state of bookie to Writable mode.
+ */
+ Future<Void> transitionToWritableMode();
+
+ /**
+ * Change the state of bookie to ReadOnly mode.
+ */
+ Future<Void> transitionToReadOnlyMode();
+
+ /**
+ * ShutdownHandler used to shutdown bookie.
+ */
+ interface ShutdownHandler {
+ void shutdown(int code);
+ }
+
+ void setShutdownHandler(ShutdownHandler handler);
+}
+
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index b0257ec..4852fbe 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollectorThread;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
import org.apache.bookkeeper.common.util.Watcher;
@@ -136,8 +137,8 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
@Override
public void initialize(ServerConfiguration conf, LedgerManager
ledgerManager, LedgerDirsManager ledgerDirsManager,
- LedgerDirsManager indexDirsManager, CheckpointSource
checkpointSource, Checkpointer checkpointer,
- StatsLogger statsLogger) throws IOException {
+ LedgerDirsManager indexDirsManager, StateManager stateManager,
CheckpointSource checkpointSource,
+ Checkpointer checkpointer, StatsLogger statsLogger)
throws IOException {
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 0c240da..2e42b05 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
@@ -109,8 +108,9 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
}
void testRegisterBookie(ServerConfiguration conf) throws IOException {
- super.doRegisterBookie();
+ super.getStateManager().doRegisterBookie();
}
+
}
/**
@@ -133,7 +133,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
BookieException {
MockBookie bookie = new MockBookie(conf);
rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
- bookie.registrationManager = rm;
+ bookie.setRegistrationManager(rm);
((ZKRegistrationManager)
bookie.registrationManager).setZk(zkc);
((ZKRegistrationManager)
bookie.registrationManager).getZk().close();
return bookie;
@@ -162,8 +162,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
MockBookie b = new MockBookie(conf);
conf.setZkServers(zkUtil.getZooKeeperConnectString());
rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
- b.registrationManager = rm;
-
+ b.setRegistrationManager(rm);
b.testRegisterBookie(conf);
ZooKeeper zooKeeper = ((ZKRegistrationManager) rm).getZk();
assertNotNull("Bookie registration node doesn't exists!",
@@ -196,8 +195,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
conf.setZkServers(zkUtil.getZooKeeperConnectString());
rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
- b.registrationManager = rm;
-
+ b.setRegistrationManager(rm);
b.testRegisterBookie(conf);
Stat bkRegNode1 = ((ZKRegistrationManager)
rm).getZk().exists(bkRegPath, false);
@@ -208,7 +206,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
// zkclient and doing the registration.
RegistrationManager newRm = new ZKRegistrationManager();
newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
- b.registrationManager = newRm;
+ b.setRegistrationManager(newRm);
try (ZooKeeperClient newZk = createNewZKClient()) {
// deleting the znode, so that the bookie registration should
@@ -308,8 +306,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
conf.setZkServers(zkUtil.getZooKeeperConnectString());
rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
- b.registrationManager = rm;
-
+ b.setRegistrationManager(rm);
b.testRegisterBookie(conf);
Stat bkRegNode1 = zkc.exists(bkRegPath, false);
assertNotNull("Bookie registration node doesn't exists!",
@@ -320,7 +317,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
ZooKeeperClient newzk = createNewZKClient();
RegistrationManager newRm = new ZKRegistrationManager();
newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
- b.registrationManager = newRm;
+ b.setRegistrationManager(newRm);
try {
b.testRegisterBookie(conf);
fail("Should throw NodeExistsException as the znode is not getting
expired");
@@ -814,7 +811,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
Bookie bookie = bookieServer.getBookie();
assertFalse(bookie.isReadOnly());
// transition to readonly mode, bookie status should be persisted in
ledger disks
- bookie.doTransitionToReadOnlyMode();
+ bookie.getStateManager().doTransitionToReadOnlyMode();
assertTrue(bookie.isReadOnly());
// restart bookie should start in read only mode
@@ -824,7 +821,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
bookie = bookieServer.getBookie();
assertTrue(bookie.isReadOnly());
// transition to writable mode
- bookie.doTransitionToWritableMode();
+ bookie.getStateManager().doTransitionToWritableMode();
// restart bookie should start in writable mode
bookieServer.shutdown();
bookieServer = new BookieServer(conf);
@@ -850,8 +847,8 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
bookieServer.start();
Bookie bookie = bookieServer.getBookie();
// persist bookie status
- bookie.doTransitionToReadOnlyMode();
- bookie.doTransitionToWritableMode();
+ bookie.getStateManager().doTransitionToReadOnlyMode();
+ bookie.getStateManager().doTransitionToWritableMode();
assertFalse(bookie.isReadOnly());
bookieServer.shutdown();
// start read only bookie
@@ -863,7 +860,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
bookie = bookieServer.getBookie();
assertTrue(bookie.isReadOnly());
// transition to writable should fail
- bookie.doTransitionToWritableMode();
+ bookie.getStateManager().doTransitionToWritableMode();
assertTrue(bookie.isReadOnly());
bookieServer.shutdown();
}
@@ -892,7 +889,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
// transition in to read only and persist the status on disk
Bookie bookie = bookieServer.getBookie();
assertFalse(bookie.isReadOnly());
- bookie.doTransitionToReadOnlyMode();
+ bookie.getStateManager().doTransitionToReadOnlyMode();
assertTrue(bookie.isReadOnly());
// corrupt status file
List<File> ledgerDirs =
bookie.getLedgerDirsManager().getAllLedgerDirs();
@@ -931,7 +928,7 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
// transition in to read only and persist the status on disk
Bookie bookie = bookieServer.getBookie();
assertFalse(bookie.isReadOnly());
- bookie.doTransitionToReadOnlyMode();
+ bookie.getStateManager().doTransitionToReadOnlyMode();
assertTrue(bookie.isReadOnly());
// Manually update a status file, so it becomes the latest
Thread.sleep(1);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index a678139..8fb7980 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -25,10 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,7 +38,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -235,7 +232,6 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
// Do nothing.
}
};
-
for (File journalDir : conf.getJournalDirs()) {
Bookie.checkDirectoryStructure(journalDir);
}
@@ -248,6 +244,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
LedgerManagerFactory.newLedgerManagerFactory(conf,
zkc).newLedgerManager(),
dirManager,
dirManager,
+ null,
cp,
Checkpointer.NULL,
NullStatsLogger.INSTANCE);
@@ -636,6 +633,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
manager,
dirs,
dirs,
+ null,
checkpointSource,
Checkpointer.NULL,
NullStatsLogger.INSTANCE);
@@ -660,7 +658,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
storage.initialize(
conf,
manager,
- dirs, dirs,
+ dirs, dirs, null,
checkpointSource,
Checkpointer.NULL,
NullStatsLogger.INSTANCE);
@@ -684,6 +682,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
manager,
dirs,
dirs,
+ null,
checkpointSource,
Checkpointer.NULL,
NullStatsLogger.INSTANCE);
@@ -788,6 +787,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
manager,
dirs,
dirs,
+ null,
checkpointSource,
Checkpointer.NULL,
NullStatsLogger.INSTANCE);
@@ -843,6 +843,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
LedgerManagerFactory.newLedgerManagerFactory(conf,
zkc).newLedgerManager(),
dirManager,
dirManager,
+ null,
cp,
Checkpointer.NULL,
NullStatsLogger.INSTANCE);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index b14358b..72e7d27 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.bookie;
+import static org.apache.bookkeeper.bookie.BookieException.Code.OK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -36,13 +37,14 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
@@ -474,6 +476,7 @@ public class LedgerCacheTest {
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger) throws IOException {
@@ -482,6 +485,7 @@ public class LedgerCacheTest {
ledgerManager,
ledgerDirsManager,
indexDirsManager,
+ stateManager,
checkpointSource,
checkpointer,
statsLogger);
@@ -500,6 +504,20 @@ public class LedgerCacheTest {
}
super.process(ledgerId, entryId, buffer);
}
+ // simplified memTable full callback.
+ @Override
+ public void onSizeLimitReached(final CheckpointSource.Checkpoint cp)
throws IOException {
+ LOG.info("Reached size {}", cp);
+ // use synchronous way
+ try {
+ LOG.info("Started flushing mem table.");
+ memTable.flush(FlushTestSortedLedgerStorage.this);
+ } catch (IOException e) {
+ getStateManager().doTransitionToReadOnlyMode();
+ LOG.error("Exception thrown while flushing skip list cache.",
e);
+ }
+ }
+
}
@Test
@@ -523,7 +541,6 @@ public class LedgerCacheTest {
// without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls
will fail
// because of BOOKKEEPER-965 change.
bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(),
null, "passwd".getBytes());
-
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
assertFalse("Bookie is expected to be in ReadWrite mode",
bookie.isReadOnly());
assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
@@ -548,6 +565,51 @@ public class LedgerCacheTest {
memTable.snapshot.isEmpty());
}
+ @Test
+ public void testSortedLedgerFlushFailure() throws Exception {
+ // most of the code is same to the testEntryMemTableFlushFailure
+ File tmpDir = createTempDir("bkTest", ".dir");
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ int gcWaitTime = 1000;
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime)
+ .setLedgerDirNames(new String[] { tmpDir.toString() })
+ .setJournalDirName(tmpDir.toString())
+
.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+
+ Bookie bookie = new Bookie(conf);
+ bookie.start();
+ FlushTestSortedLedgerStorage flushTestSortedLedgerStorage =
(FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+ EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+ bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(),
null, "passwd".getBytes());
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+ assertFalse("Bookie is expected to be in ReadWrite mode",
bookie.isReadOnly());
+ assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
+
+ // set flags, so that FlushTestSortedLedgerStorage simulates
FlushFailure scenario
+ flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+ flushTestSortedLedgerStorage.setInjectFlushException(true);
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+
+ // since we simulated sizeLimitReached, snapshot shouldn't be empty
+ assertFalse("EntryMemTable SnapShot is not expected to be empty",
memTable.snapshot.isEmpty());
+ // after flush failure, the bookie is set to readOnly
+ assertTrue("Bookie is expected to be in Read mode",
bookie.isReadOnly());
+ // write fail
+ bookie.addEntry(generateEntry(1, 3), new
BookkeeperInternalCallbacks.WriteCallback(){
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx){
+ LOG.info("fail write to bk");
+ assertTrue(rc != OK);
+ };
+
+ }, null, "passwd".getBytes());
+ bookie.shutdown();
+
+ }
+
private ByteBuf generateEntry(long ledger, long entry) {
byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index c70b0e0..2dfebdf 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -119,11 +119,13 @@ public class SortedLedgerStorageCheckpointTest extends
LedgerStorageTestBase {
log.error("Failed to checkpoint at {}", checkpoint, e);
}
});
+ // if the SortedLedgerStorage need not to change bookie's state, pass
StateManager==null is ok
this.storage.initialize(
conf,
mock(LedgerManager.class),
ledgerDirsManager,
ledgerDirsManager,
+ null,
checkpointSrc,
checkpointer,
NullStatsLogger.INSTANCE);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
new file mode 100644
index 0000000..8c8b636
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
@@ -0,0 +1,258 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static
org.apache.bookkeeper.bookie.BookieException.Code.MetadataStoreException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testing StateManager cases.
+ */
+public class StateManagerTest extends BookKeeperClusterTestCase {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(StateManagerTest.class);
+
+ @Rule
+ public final TestName runtime = new TestName();
+ final ServerConfiguration conf;
+ MockZKRegistrationManager rm;
+
+ public StateManagerTest(){
+ super(0);
+ String ledgersPath = "/" + "ledgers" + runtime.getMethodName();
+ baseClientConf.setZkLedgersRootPath(ledgersPath);
+ baseConf.setZkLedgersRootPath(ledgersPath);
+ conf = TestBKConfiguration.newServerConfiguration();
+
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ zkUtil.createBKEnsemble("/" + runtime.getMethodName());
+ rm = new MockZKRegistrationManager();
+ File tmpDir = createTempDir("stateManger", "test");
+ conf.setJournalDirName(tmpDir.getPath())
+ .setLedgerDirNames(new String[] { tmpDir.getPath() })
+ .setJournalDirName(tmpDir.toString())
+ .setZkServers(zkUtil.getZooKeeperConnectString());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (rm != null) {
+ rm.close();
+ }
+ }
+
+ private static class MockZKRegistrationManager extends
ZKRegistrationManager {
+ boolean registerFailed = false;
+ void setRegisterFail(boolean failOrNot){
+ registerFailed = failOrNot;
+ }
+ @Override
+ public void registerBookie(String bookieId, boolean readOnly) throws
BookieException {
+ if (registerFailed) {
+ throw BookieException.create(MetadataStoreException);
+ }
+ super.registerBookie(bookieId, readOnly);
+ }
+
+ }
+
+ /**
+ * Bookie should shutdown when it register to Registration service fail.
+ * On ZooKeeper exception, should return exit code ZK_REG_FAIL = 4
+ */
+ @Test
+ public void testShutdown() throws Exception {
+ File tmpDir = createTempDir("stateManger", "test");
+
+ final ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(tmpDir.getPath())
+ .setLedgerDirNames(new String[] { tmpDir.getPath() })
+ .setJournalDirName(tmpDir.toString())
+ .setZkServers(zkUtil.getZooKeeperConnectString());
+
+ BookieServer bkServer = new BookieServer(conf) {
+ protected Bookie newBookie(ServerConfiguration conf)
+ throws IOException, KeeperException, InterruptedException,
+ BookieException {
+ Bookie bookie = new Bookie(conf);
+ rm.setRegisterFail(true);
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ bookie.setRegistrationManager(rm);
+ return bookie;
+ }
+ };
+ bkServer.start();
+ bkServer.join();
+ assertTrue("Failed to return failCode ZK_REG_FAIL",
+ ExitCode.ZK_REG_FAIL == bkServer.getExitCode());
+ }
+
+ /**
+ * StateManager can transition between writable mode and readOnly mode if
it was not created with readOnly mode.
+ */
+ @Test
+ public void testNormalBookieTransitions() throws Exception {
+ BookieStateManager stateManager = new BookieStateManager(conf, rm);
+ rm.initialize(conf, () -> {
+ stateManager.forceToUnregistered();
+ // schedule a re-register operation
+ stateManager.registerBookie(false);
+ }, NullStatsLogger.INSTANCE);
+
+ stateManager.initState();
+ stateManager.registerBookie(true).get();
+
+ assertTrue(stateManager.isRunning());
+ assertTrue(stateManager.isRegistered());
+
+ stateManager.transitionToReadOnlyMode().get();
+ assertTrue(stateManager.isReadOnly());
+
+ stateManager.transitionToWritableMode().get();
+ assertTrue(stateManager.isRunning());
+ assertFalse(stateManager.isReadOnly());
+
+ stateManager.close();
+ assertFalse(stateManager.isRunning());
+ }
+
+ @Test
+ public void testReadOnlyDisableBookieTransitions() throws Exception {
+ conf.setReadOnlyModeEnabled(false);
+ // readOnly disabled bk stateManager
+ BookieStateManager stateManager = new BookieStateManager(conf, rm);
+ // simulate sync shutdown logic in bookie
+ stateManager.setShutdownHandler(new StateManager.ShutdownHandler() {
+ @Override
+ public void shutdown(int code) {
+ try {
+ if (stateManager.isRunning()) {
+ stateManager.forceToShuttingDown();
+ stateManager.forceToReadOnly();
+ }
+
+ } finally {
+ stateManager.close();
+ }
+ }
+ });
+ rm.initialize(conf, () -> {
+ stateManager.forceToUnregistered();
+ // schedule a re-register operation
+ stateManager.registerBookie(false);
+ }, NullStatsLogger.INSTANCE);
+
+ stateManager.initState();
+ stateManager.registerBookie(true).get();
+ assertTrue(stateManager.isRunning());
+
+ stateManager.transitionToReadOnlyMode().get();
+ // stateManager2 will shutdown
+ assertFalse(stateManager.isRunning());
+ // different dimension of bookie state: running <--> down, read <-->
write, unregistered <--> registered
+ // bookie2 is set to readOnly when shutdown
+ assertTrue(stateManager.isReadOnly());
+ }
+
+ @Test
+ public void testReadOnlyBookieTransitions() throws Exception{
+ // readOnlybk, which use override stateManager impl
+ File tmpDir = createTempDir("stateManger", "test-readonly");
+ final ServerConfiguration readOnlyConf =
TestBKConfiguration.newServerConfiguration();
+ readOnlyConf.setJournalDirName(tmpDir.getPath())
+ .setLedgerDirNames(new String[] { tmpDir.getPath() })
+ .setJournalDirName(tmpDir.toString())
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setForceReadOnlyBookie(true);
+ ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf,
NullStatsLogger.INSTANCE);
+ readOnlyBookie.start();
+ assertTrue(readOnlyBookie.isRunning());
+ assertTrue(readOnlyBookie.isReadOnly());
+
+ // transition has no effect if bookie start with readOnly mode
+ readOnlyBookie.getStateManager().transitionToWritableMode().get();
+ assertTrue(readOnlyBookie.isRunning());
+ assertTrue(readOnlyBookie.isReadOnly());
+ readOnlyBookie.shutdown();
+
+ }
+
+ /**
+ * Verify the bookie reg.
+ */
+ @Test
+ public void testRegistration() throws Exception {
+ BookieStateManager stateManager = new BookieStateManager(conf, rm);
+ rm.initialize(conf, () -> {
+ stateManager.forceToUnregistered();
+ // schedule a re-register operation
+ stateManager.registerBookie(false);
+ }, NullStatsLogger.INSTANCE);
+ // simulate sync shutdown logic in bookie
+ stateManager.setShutdownHandler(new StateManager.ShutdownHandler() {
+ @Override
+ public void shutdown(int code) {
+ try {
+ if (stateManager.isRunning()) {
+ stateManager.forceToShuttingDown();
+ stateManager.forceToReadOnly();
+ }
+
+ } finally {
+ stateManager.close();
+ }
+ }
+ });
+ stateManager.initState();
+ // up
+ assertTrue(stateManager.isRunning());
+ // unregistered
+ assertFalse(stateManager.isRegistered());
+
+ stateManager.registerBookie(true).get();
+ // registered
+ assertTrue(stateManager.isRegistered());
+ stateManager.getShutdownHandler().shutdown(ExitCode.OK);
+ // readOnly
+ assertTrue(stateManager.isReadOnly());
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index ea60aa8..e6cb93d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -271,6 +271,7 @@ public class TestSyncThread {
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger)
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index a954f28..75e061a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -88,7 +88,7 @@ public class ConversionRollbackTest {
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
DbLedgerStorage dbStorage = new DbLedgerStorage();
- dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
checkpointSource, checkpointer,
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
// Insert some ledger & entries in the dbStorage
@@ -118,8 +118,8 @@ public class ConversionRollbackTest {
// Verify that interleaved storage index has the same entries
InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
- interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager,
+ null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
Set<Long> ledgers =
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index df5434c..1816945 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -85,8 +85,8 @@ public class ConversionTest {
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
InterleavedLedgerStorage interleavedStorage = new
InterleavedLedgerStorage();
- interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager,
+ null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
// Insert some ledger & entries in the interleaved storage
for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -115,12 +115,12 @@ public class ConversionTest {
// Verify that db index has the same entries
DbLedgerStorage dbStorage = new DbLedgerStorage();
- dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
+ null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
interleavedStorage = new InterleavedLedgerStorage();
- interleavedStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ interleavedStorage.initialize(conf, null, ledgerDirsManager,
+ ledgerDirsManager, null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
Set<Long> ledgers =
Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L,
4L)), ledgers);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 4793ec7..c1f7e39 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -86,7 +86,7 @@ public class LocationsIndexRebuildTest {
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
DbLedgerStorage ledgerStorage = new DbLedgerStorage();
- ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
// Insert some ledger & entries in the storage
@@ -116,7 +116,7 @@ public class LocationsIndexRebuildTest {
// Verify that db index has the same entries
ledgerStorage = new DbLedgerStorage();
- ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, checkpointSource, checkpointer,
+ ledgerStorage.initialize(conf, null, ledgerDirsManager,
ledgerDirsManager, null, checkpointSource, checkpointer,
NullStatsLogger.INSTANCE);
Set<Long> ledgers =
Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 730490b..14bb9cf 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -56,6 +56,7 @@ import org.apache.bookkeeper.bookie.GarbageCollector;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
+import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
@@ -547,6 +548,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger) throws IOException {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 1ecd9f3..32be5fc 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -126,6 +127,7 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
StatsLogger statsLogger) throws IOException {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index b9f46e4..c2e3f3e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -69,6 +68,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Tests publishing of under replicated ledgers by the Auditor bookie node when
* corresponding bookies identifes as not running.
@@ -295,7 +295,7 @@ public class AuditorLedgerCheckerTest extends
BookKeeperClusterTestCase {
ServerConfiguration bookieConf = bsConfs.get(2);
BookieServer bk = bs.get(2);
bookieConf.setReadOnlyModeEnabled(true);
- bk.getBookie().doTransitionToReadOnlyMode();
+ bk.getBookie().getStateManager().doTransitionToReadOnlyMode();
// grace period for publishing the bk-ledger
LOG.debug("Waiting for Auditor to finish ledger check.");
@@ -321,7 +321,7 @@ public class AuditorLedgerCheckerTest extends
BookKeeperClusterTestCase {
ServerConfiguration bookieConf = bsConfs.get(bkIndex);
BookieServer bk = bs.get(bkIndex);
bookieConf.setReadOnlyModeEnabled(true);
- bk.getBookie().doTransitionToReadOnlyMode();
+ bk.getBookie().getStateManager().doTransitionToReadOnlyMode();
// grace period for publishing the bk-ledger
LOG.debug("Waiting for Auditor to finish ledger check.");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 300b6f0..5ed514f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -326,7 +326,7 @@ public abstract class BookKeeperClusterTestCase {
public void setBookieToReadOnly(BookieSocketAddress addr) throws
InterruptedException, UnknownHostException {
for (BookieServer server : bs) {
if (server.getLocalAddress().equals(addr)) {
- server.getBookie().doTransitionToReadOnlyMode();
+
server.getBookie().getStateManager().doTransitionToReadOnlyMode();
break;
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 8ee0343..a34bbcb 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -24,11 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.File;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
@@ -260,7 +258,7 @@ public class ReadOnlyBookieTest extends
BookKeeperClusterTestCase {
killBookie(1);
baseConf.setReadOnlyModeEnabled(true);
startNewBookie();
- bs.get(1).getBookie().doTransitionToReadOnlyMode();
+ bs.get(1).getBookie().getStateManager().doTransitionToReadOnlyMode();
try {
bkc.waitForReadOnlyBookie(Bookie.getBookieAddress(bsConfs.get(1)))
.get(30, TimeUnit.SECONDS);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].