This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new cf7a72b Refactor EntryLogger class
cf7a72b is described below
commit cf7a72b011378ac5be8e04edd35dfb97e00cedab
Author: cguttapalem <[email protected]>
AuthorDate: Thu May 3 15:49:30 2018 -0700
Refactor EntryLogger class
Descriptions of the changes in this PR:
- Split EntryLogger class into multiple classes.
- create separate classes for EntryLogManager, EntryLoggerAllocator and
EntryLogManagerForSingleEntryLog.
Author: cguttapalem <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1365 from reddycharan/refactorentrylogger
---
.../apache/bookkeeper/bookie/EntryLogManager.java | 103 ++++
.../bookkeeper/bookie/EntryLogManagerBase.java | 163 ++++++
.../bookie/EntryLogManagerForSingleEntryLog.java | 263 +++++++++
.../org/apache/bookkeeper/bookie/EntryLogger.java | 588 +--------------------
.../bookkeeper/bookie/EntryLoggerAllocator.java | 215 ++++++++
.../apache/bookkeeper/bookie/CreateNewLogTest.java | 2 -
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 5 +-
.../bookie/LedgerStorageCheckpointTest.java | 1 -
.../bookie/SortedLedgerStorageCheckpointTest.java | 1 -
9 files changed, 758 insertions(+), 583 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
new file mode 100644
index 0000000..340e9a1
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+
+interface EntryLogManager {
+
+ /*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+ long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws
IOException;
+
+ /*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+ BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+ /*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+ File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+ /*
+ * Do the operations required for checkpoint.
+ */
+ void checkpoint() throws IOException;
+
+ /*
+ * flush both current and rotated logs.
+ */
+ void flush() throws IOException;
+
+ /*
+ * close current logs.
+ */
+ void close() throws IOException;
+
+ /*
+ * force close current logs.
+ */
+ void forceClose();
+
+ /*
+ * prepare entrylogger/entrylogmanager before doing SortedLedgerStorage
+ * Checkpoint.
+ */
+ void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws
IOException;
+
+ /*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+ void prepareEntryMemTableFlush();
+
+ /*
+ * this method should be called after doing entrymemtable flush,it would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
+ *
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
+ * would be called from the same thread.
+ *
+ * returns boolean value indicating whether EntryMemTable should do
checkpoint
+ * after this commit method.
+ */
+ boolean commitEntryMemTableFlush() throws IOException;
+
+ /*
+ * creates new separate log for compaction.
+ */
+ BufferedLogChannel createNewLogForCompaction() throws IOException;
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
new file mode 100644
index 0000000..849336f
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
+import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+@Slf4j
+abstract class EntryLogManagerBase implements EntryLogManager {
+ volatile List<BufferedLogChannel> rotatedLogChannels;
+ final EntryLoggerAllocator entryLoggerAllocator;
+ private final LedgerDirsManager ledgerDirsManager;
+ private final List<EntryLogger.EntryLogListener> listeners;
+ /**
+ * The maximum size of a entry logger file.
+ */
+ final long logSizeLimit;
+
+ EntryLogManagerBase(ServerConfiguration conf, LedgerDirsManager
ledgerDirsManager,
+ EntryLoggerAllocator entryLoggerAllocator,
List<EntryLogger.EntryLogListener> listeners) {
+ this.ledgerDirsManager = ledgerDirsManager;
+ this.entryLoggerAllocator = entryLoggerAllocator;
+ this.listeners = listeners;
+ this.logSizeLimit = conf.getEntryLogSizeLimit();
+ }
+
+ private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new
FastThreadLocal<ByteBuf>() {
+ @Override
+ protected ByteBuf initialValue() throws Exception {
+ return Unpooled.buffer(4);
+ }
+ };
+
+ /*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+ @Override
+ public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws
IOException {
+ int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to
prepend the size
+ BufferedLogChannel logChannel =
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ ByteBuf sizeBuffer = sizeBufferForAdd.get();
+ sizeBuffer.clear();
+ sizeBuffer.writeInt(entry.readableBytes());
+ logChannel.write(sizeBuffer);
+
+ long pos = logChannel.position();
+ logChannel.write(entry);
+ logChannel.registerWrittenEntry(ledger, entrySize);
+
+ return (logChannel.getLogId() << 32L) | pos;
+ }
+
+ boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+ if (logChannel == null) {
+ return false;
+ }
+ return logChannel.position() + size > logSizeLimit;
+ }
+
+ boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
+ if (logChannel == null) {
+ return false;
+ }
+ return logChannel.position() + size > Integer.MAX_VALUE;
+ }
+
+ abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+ abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long
ledgerId, int entrySize, boolean rollLog)
+ throws IOException;
+
+ abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId,
BufferedLogChannel logChannel);
+
+ /*
+ * flush current logs.
+ */
+ abstract void flushCurrentLogs() throws IOException;
+
+ /*
+ * flush rotated logs.
+ */
+ abstract void flushRotatedLogs() throws IOException;
+
+ List<BufferedLogChannel> getRotatedLogChannels() {
+ return rotatedLogChannels;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushCurrentLogs();
+ flushRotatedLogs();
+ }
+
+ void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata)
throws IOException {
+ if (logChannel != null) {
+ logChannel.flushAndForceWrite(forceMetadata);
+ log.debug("Flush and sync current entry logger {}",
logChannel.getLogId());
+ }
+ }
+
+ /*
+ * Creates a new log file. This method should be guarded by a lock,
+ * so callers of this method should be in right scope of the lock.
+ */
+ void createNewLog(long ledgerId) throws IOException {
+ BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+ // first tried to create a new log channel. add current log channel to
ToFlush list only when
+ // there is a new log channel. it would prevent that a log channel is
referenced by both
+ // *logChannel* and *ToFlush* list.
+ if (null != logChannel) {
+
+ // flush the internal buffer back to filesystem but not sync disk
+ logChannel.flush();
+
+ // Append ledgers map at the end of entry log
+ logChannel.appendLedgersMap();
+
+ BufferedLogChannel newLogChannel =
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+ setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+ log.info("Flushing entry logger {} back to filesystem, pending for
syncing entry loggers : {}.",
+ logChannel.getLogId(), rotatedLogChannels);
+ for (EntryLogListener listener : listeners) {
+ listener.onRotateEntryLog();
+ }
+ } else {
+ setCurrentLogForLedgerAndAddToRotate(ledgerId,
+
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
+ }
+ }
+
+ File selectDirForNextEntryLog() throws NoWritableLedgerDirException {
+ return
getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirsForNewLog());
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
new file mode 100644
index 0000000..84e4ad3
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.EntryLogger.INVALID_LID;
+import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;
+
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+@Slf4j
+class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+ private volatile BufferedLogChannel activeLogChannel;
+ private long logIdBeforeFlush = INVALID_LID;
+ private final AtomicBoolean shouldCreateNewEntryLog = new
AtomicBoolean(false);
+ private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+
+ EntryLogManagerForSingleEntryLog(ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager,
+ EntryLoggerAllocator entryLoggerAllocator,
List<EntryLogger.EntryLogListener> listeners,
+ EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) {
+ super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+ this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+ this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+ // Register listener for disk full notifications.
+ ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+ }
+
+ private LedgerDirsListener getLedgerDirsListener() {
+ return new LedgerDirsListener() {
+ @Override
+ public void diskFull(File disk) {
+ // If the current entry log disk is full, then create new
+ // entry log.
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null
+ &&
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+ shouldCreateNewEntryLog.set(true);
+ }
+ }
+
+ @Override
+ public void diskAlmostFull(File disk) {
+ // If the current entry log disk is almost full, then create
new entry
+ // log.
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null
+ &&
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+ shouldCreateNewEntryLog.set(true);
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized long addEntry(long ledger, ByteBuf entry, boolean
rollLog) throws IOException {
+ return super.addEntry(ledger, entry, rollLog);
+ }
+
+ @Override
+ synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long
ledgerId, int entrySize,
+ boolean rollLog) throws IOException {
+ if (null == activeLogChannel) {
+ // log channel can be null because the file is deferred to be
created
+ createNewLog(UNASSIGNED_LEDGERID);
+ }
+
+ boolean reachEntryLogLimit = rollLog ?
reachEntryLogLimit(activeLogChannel, entrySize)
+ : readEntryLogHardLimit(activeLogChannel, entrySize);
+ // Create new log if logSizeLimit reached or current disk is full
+ boolean createNewLog = shouldCreateNewEntryLog.get();
+ if (createNewLog || reachEntryLogLimit) {
+ if (activeLogChannel != null) {
+ activeLogChannel.flushAndForceWriteIfRegularFlush(false);
+ }
+ createNewLog(UNASSIGNED_LEDGERID);
+ // Reset the flag
+ if (createNewLog) {
+ shouldCreateNewEntryLog.set(false);
+ }
+ }
+ return activeLogChannel;
+ }
+
+ @Override
+ synchronized void createNewLog(long ledgerId) throws IOException {
+ super.createNewLog(ledgerId);
+ }
+
+ @Override
+ public synchronized void setCurrentLogForLedgerAndAddToRotate(long
ledgerId, BufferedLogChannel logChannel) {
+ BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+ activeLogChannel = logChannel;
+ if (hasToRotateLogChannel != null) {
+ rotatedLogChannels.add(hasToRotateLogChannel);
+ }
+ }
+
+ @Override
+ public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
+ return activeLogChannel;
+ }
+
+ @Override
+ public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+ BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+ if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId()
== entryLogId)) {
+ return activeLogChannelTemp;
+ }
+ return null;
+ }
+
+ @Override
+ public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+ Collections.shuffle(writableLedgerDirs);
+ return writableLedgerDirs.get(0);
+ }
+
+ @Override
+ public void checkpoint() throws IOException {
+ flushRotatedLogs();
+ }
+
+ public long getCurrentLogId() {
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null) {
+ return currentActiveLogChannel.getLogId();
+ } else {
+ return EntryLogger.UNINITIALIZED_LOG_ID;
+ }
+ }
+
+ @Override
+ public void flushCurrentLogs() throws IOException {
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null) {
+ /**
+ * flushCurrentLogs method is called during checkpoint, so
+ * metadata of the file also should be force written.
+ */
+ flushLogChannel(currentActiveLogChannel, true);
+ }
+ }
+
+ @Override
+ void flushRotatedLogs() throws IOException {
+ List<BufferedLogChannel> channels = null;
+ synchronized (this) {
+ channels = rotatedLogChannels;
+ rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+ }
+ if (null == channels) {
+ return;
+ }
+ Iterator<BufferedLogChannel> chIter = channels.iterator();
+ while (chIter.hasNext()) {
+ BufferedLogChannel channel = chIter.next();
+ try {
+ channel.flushAndForceWrite(true);
+ } catch (IOException ioe) {
+ // rescue from flush exception, add unflushed channels back
+ synchronized (this) {
+ if (null == rotatedLogChannels) {
+ rotatedLogChannels = channels;
+ } else {
+ rotatedLogChannels.addAll(0, channels);
+ }
+ }
+ throw ioe;
+ }
+ // remove the channel from the list after it is successfully
flushed
+ chIter.remove();
+ // since this channel is only used for writing, after flushing the
channel,
+ // we had to close the underlying file channel. Otherwise, we
might end up
+ // leaking fds which cause the disk spaces could not be reclaimed.
+ EntryLogger.closeFileChannel(channel);
+
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+ log.info("Synced entry logger {} to disk.", channel.getLogId());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (activeLogChannel != null) {
+ EntryLogger.closeFileChannel(activeLogChannel);
+ }
+ }
+
+ @Override
+ public void forceClose() {
+ if (activeLogChannel != null) {
+ EntryLogger.forceCloseFileChannel(activeLogChannel);
+ }
+ }
+
+ @Override
+ public void prepareEntryMemTableFlush() {
+ logIdBeforeFlush = getCurrentLogId();
+ }
+
+ @Override
+ public boolean commitEntryMemTableFlush() throws IOException {
+ long logIdAfterFlush = getCurrentLogId();
+ /*
+ * in any case that an entry log reaches the limit, we roll the log
+ * and start checkpointing. if a memory table is flushed spanning
+ * over two entry log files, we also roll log. this is for
+ * performance consideration: since we don't wanna checkpoint a new
+ * log file that ledger storage is writing to.
+ */
+ if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush !=
logIdBeforeFlush) {
+ log.info("Rolling entry logger since it reached size limitation");
+ createNewLog(UNASSIGNED_LEDGERID);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed)
throws IOException{
+ if (numBytesFlushed > 0) {
+ // if bytes are added between previous flush and this checkpoint,
+ // it means bytes might live at current active entry log, we need
+ // roll current entry log and then issue checkpoint to underlying
+ // interleaved ledger storage.
+ createNewLog(UNASSIGNED_LEDGERID);
+ }
+ }
+
+ @Override
+ public EntryLogger.BufferedLogChannel createNewLogForCompaction() throws
IOException {
+ return
entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index d289b0e..75445c2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -22,8 +22,6 @@
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
-import static
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
-import static
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -35,18 +33,14 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.FastThreadLocal;
-
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -54,24 +48,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
@@ -90,7 +75,7 @@ public class EntryLogger {
private static final Logger LOG =
LoggerFactory.getLogger(EntryLogger.class);
static final long UNASSIGNED_LEDGERID = -1L;
// log file suffix
- private static final String LOG_FILE_SUFFIX = ".log";
+ static final String LOG_FILE_SUFFIX = ".log";
@VisibleForTesting
static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -145,7 +130,7 @@ public class EntryLogger {
* Append the ledger map at the end of the entry log.
* Updates the entry log file header with the offset and size of the
map.
*/
- private void appendLedgersMap() throws IOException {
+ void appendLedgersMap() throws IOException {
long ledgerMapOffset = this.position();
@@ -228,21 +213,16 @@ public class EntryLogger {
*/
private final Object compactionLogLock = new Object();
- /**
- * The maximum size of a entry logger file.
- */
- final long logSizeLimit;
private volatile BufferedLogChannel compactionLogChannel;
final EntryLoggerAllocator entryLoggerAllocator;
private final EntryLogManager entryLogManager;
- private final boolean entryLogPreAllocationEnabled;
private final CopyOnWriteArrayList<EntryLogListener> listeners = new
CopyOnWriteArrayList<EntryLogListener>();
private static final int HEADER_V0 = 0; // Old log file format (no ledgers
map index)
private static final int HEADER_V1 = 1; // Introduced ledger map index
- private static final int HEADER_CURRENT_VERSION = HEADER_V1;
+ static final int HEADER_CURRENT_VERSION = HEADER_V1;
private static class Header {
final int version;
@@ -360,9 +340,6 @@ public class EntryLogger {
if (listener != null) {
addListener(listener);
}
- // log size limit
- this.logSizeLimit = Math.min(conf.getEntryLogSizeLimit(),
MAX_LOG_SIZE_LIMIT);
- this.entryLogPreAllocationEnabled =
conf.isEntryLogFilePreAllocationEnabled();
// Initialize the entry log header buffer. This cannot be a static
object
// since in our unit tests, we run multiple Bookies and thus
EntryLoggers
@@ -386,9 +363,11 @@ public class EntryLogger {
}
}
this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId
+ 1);
- this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
+ this.entryLoggerAllocator = new EntryLoggerAllocator(conf,
ledgerDirsManager, recentlyCreatedEntryLogsStatus,
+ logId);
if (entryLogPerLedgerEnabled) {
- this.entryLogManager = new
EntryLogManagerForSingleEntryLog(ledgerDirsManager) {
+ this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf,
ledgerDirsManager, entryLoggerAllocator,
+ listeners, recentlyCreatedEntryLogsStatus) {
@Override
public void checkpoint() throws IOException {
/*
@@ -445,7 +424,8 @@ public class EntryLogger {
}
};
} else {
- this.entryLogManager = new
EntryLogManagerForSingleEntryLog(ledgerDirsManager);
+ this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf,
ledgerDirsManager, entryLoggerAllocator,
+ listeners, recentlyCreatedEntryLogsStatus);
}
}
@@ -584,137 +564,6 @@ public class EntryLogger {
}
/**
- * An allocator pre-allocates entry log files.
- */
- class EntryLoggerAllocator {
-
- private long preallocatedLogId;
- private Future<BufferedLogChannel> preallocation = null;
- private ExecutorService allocatorExecutor;
- private final Object createEntryLogLock = new Object();
- private final Object createCompactionLogLock = new Object();
-
- EntryLoggerAllocator(long logId) {
- preallocatedLogId = logId;
- allocatorExecutor = Executors.newSingleThreadExecutor();
- }
-
- synchronized long getPreallocatedLogId() {
- return preallocatedLogId;
- }
-
- BufferedLogChannel createNewLog() throws IOException {
- synchronized (createEntryLogLock) {
- BufferedLogChannel bc;
- if (!entryLogPreAllocationEnabled){
- // create a new log directly
- bc = allocateNewLog();
- return bc;
- } else {
- // allocate directly to response request
- if (null == preallocation){
- bc = allocateNewLog();
- } else {
- // has a preallocated entry log
- try {
- bc = preallocation.get();
- } catch (ExecutionException ee) {
- if (ee.getCause() instanceof IOException) {
- throw (IOException) (ee.getCause());
- } else {
- throw new IOException("Error to execute entry
log allocation.", ee);
- }
- } catch (CancellationException ce) {
- throw new IOException("Task to allocate a new
entry log is cancelled.", ce);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Intrrupted when waiting a
new entry log to be allocated.", ie);
- }
- }
- // preallocate a new log in background upon every call
- preallocation = allocatorExecutor.submit(() ->
allocateNewLog());
- return bc;
- }
- }
- }
-
- BufferedLogChannel createNewLogForCompaction() throws IOException {
- synchronized (createCompactionLogLock) {
- return allocateNewLog(COMPACTING_SUFFIX);
- }
- }
-
- private synchronized BufferedLogChannel allocateNewLog() throws
IOException {
- return allocateNewLog(".log");
- }
-
- /**
- * Allocate a new log file.
- */
- private synchronized BufferedLogChannel allocateNewLog(String suffix)
throws IOException {
- List<File> list =
ledgerDirsManager.getWritableLedgerDirsForNewLog();
- File dirForNextEntryLog =
entryLogManager.getDirForNextEntryLog(list);
-
- List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
- String logFileName;
- // It would better not to overwrite existing entry log files
- File testLogFile = null;
- do {
- if (preallocatedLogId >= Integer.MAX_VALUE) {
- preallocatedLogId = 0;
- } else {
- ++preallocatedLogId;
- }
- logFileName = Long.toHexString(preallocatedLogId) + suffix;
- for (File dir : ledgersDirs) {
- testLogFile = new File(dir, logFileName);
- if (testLogFile.exists()) {
- LOG.warn("Found existed entry log " + testLogFile
- + " when trying to create it as a new log.");
- testLogFile = null;
- break;
- }
- }
- } while (testLogFile == null);
-
- File newLogFile = new File(dirForNextEntryLog, logFileName);
- FileChannel channel = new RandomAccessFile(newLogFile,
"rw").getChannel();
-
- BufferedLogChannel logChannel = new BufferedLogChannel(channel,
conf.getWriteBufferBytes(),
- conf.getReadBufferBytes(), preallocatedLogId, newLogFile,
conf.getFlushIntervalInBytes());
- logfileHeader.readerIndex(0);
- logChannel.write(logfileHeader);
-
- for (File f : list) {
- setLastLogId(f, preallocatedLogId);
- }
-
- if (suffix.equals(LOG_FILE_SUFFIX)) {
-
recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
- }
-
- LOG.info("Created new entry log file {} for logId {}.",
newLogFile, preallocatedLogId);
- return logChannel;
- }
-
- /**
- * Stop the allocator.
- */
- void stop() {
- // wait until the preallocation finished.
- allocatorExecutor.shutdown();
- LOG.info("Stopped entry logger preallocator.");
- }
-
- /**
- * get the preallocation for tests.
- */
- Future<BufferedLogChannel> getPreallocationFuture(){
- return preallocation;
- }
- }
-
- /**
* Remove entry log.
*
* @param entryLogId
@@ -736,27 +585,6 @@ public class EntryLogger {
return true;
}
- /**
- * writes the given id to the "lastId" file in the given directory.
- */
- private void setLastLogId(File dir, long logId) throws IOException {
- FileOutputStream fos;
- fos = new FileOutputStream(new File(dir, "lastId"));
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos,
UTF_8));
- try {
- bw.write(Long.toHexString(logId) + "\n");
- bw.flush();
- } catch (IOException e) {
- LOG.warn("Failed write lastId file");
- } finally {
- try {
- bw.close();
- } catch (IOException e) {
- LOG.error("Could not close lastId file in {}", dir.getPath());
- }
- }
- }
-
private long getLastLogId(File dir) {
long id = readLastLogId(dir);
// read success
@@ -812,396 +640,6 @@ public class EntryLogger {
}
}
- interface EntryLogManager {
-
- /*
- * add entry to the corresponding entrylog and return the position of
- * the entry in the entrylog
- */
- long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws
IOException;
-
- /*
- * gets the active logChannel with the given entryLogId. null if it is
- * not existing.
- */
- BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
-
- /*
- * Returns eligible writable ledger dir for the creation next entrylog
- */
- File getDirForNextEntryLog(List<File> writableLedgerDirs);
-
- /*
- * Do the operations required for checkpoint.
- */
- void checkpoint() throws IOException;
-
- /*
- * flush both current and rotated logs.
- */
- void flush() throws IOException;
-
- /*
- * close current logs.
- */
- void close() throws IOException;
-
- /*
- * force close current logs.
- */
- void forceClose();
-
- /*
- *
- */
- void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws
IOException;
-
- /*
- * this method should be called before doing entrymemtable flush, it
- * would save the state of the entrylogger before entrymemtable flush
- * and commitEntryMemTableFlush would take appropriate action after
- * entrymemtable flush.
- */
- void prepareEntryMemTableFlush();
-
- /*
- * this method should be called after doing entrymemtable flush,it
would
- * take appropriate action after entrymemtable flush depending on the
- * current state of the entrylogger and the state of the entrylogger
- * during prepareEntryMemTableFlush.
- *
- * It is assumed that there would be corresponding
- * prepareEntryMemTableFlush for every commitEntryMemTableFlush and
both
- * would be called from the same thread.
- *
- * returns boolean value indicating whether EntryMemTable should do
checkpoint
- * after this commit method.
- */
- boolean commitEntryMemTableFlush() throws IOException;
- }
-
- abstract class EntryLogManagerBase implements EntryLogManager {
- volatile List<BufferedLogChannel> rotatedLogChannels;
-
- private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new
FastThreadLocal<ByteBuf>() {
- @Override
- protected ByteBuf initialValue() throws Exception {
- return Unpooled.buffer(4);
- }
- };
-
- /*
- * This method should be guarded by a lock, so callers of this method
- * should be in the right scope of the lock.
- */
- @Override
- public long addEntry(long ledger, ByteBuf entry, boolean rollLog)
throws IOException {
- int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to
prepend the size
- BufferedLogChannel logChannel =
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
- ByteBuf sizeBuffer = sizeBufferForAdd.get();
- sizeBuffer.clear();
- sizeBuffer.writeInt(entry.readableBytes());
- logChannel.write(sizeBuffer);
-
- long pos = logChannel.position();
- logChannel.write(entry);
- logChannel.registerWrittenEntry(ledger, entrySize);
-
- return (logChannel.getLogId() << 32L) | pos;
- }
-
- boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
- if (logChannel == null) {
- return false;
- }
- return logChannel.position() + size > logSizeLimit;
- }
-
- boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long
size) {
- if (logChannel == null) {
- return false;
- }
- return logChannel.position() + size > Integer.MAX_VALUE;
- }
-
- abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
-
- abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long
ledgerId, int entrySize, boolean rollLog)
- throws IOException;
-
- abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId,
BufferedLogChannel logChannel);
-
- /*
- * flush current logs.
- */
- abstract void flushCurrentLogs() throws IOException;
-
- /*
- * flush rotated logs.
- */
- abstract void flushRotatedLogs() throws IOException;
-
- List<BufferedLogChannel> getRotatedLogChannels() {
- return rotatedLogChannels;
- }
-
- @Override
- public void flush() throws IOException {
- flushCurrentLogs();
- flushRotatedLogs();
- }
-
- void flushLogChannel(BufferedLogChannel logChannel, boolean
forceMetadata) throws IOException {
- if (logChannel != null) {
- logChannel.flushAndForceWrite(forceMetadata);
- LOG.debug("Flush and sync current entry logger {}",
logChannel.getLogId());
- }
- }
-
- /*
- * Creates a new log file. This method should be guarded by a lock,
- * so callers of this method should be in right scope of the lock.
- */
- void createNewLog(long ledgerId) throws IOException {
- BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
- // first tried to create a new log channel. add current log
channel to ToFlush list only when
- // there is a new log channel. it would prevent that a log channel
is referenced by both
- // *logChannel* and *ToFlush* list.
- if (null != logChannel) {
-
- // flush the internal buffer back to filesystem but not sync
disk
- logChannel.flush();
-
- // Append ledgers map at the end of entry log
- logChannel.appendLedgersMap();
-
- BufferedLogChannel newLogChannel =
entryLoggerAllocator.createNewLog();
- setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
- LOG.info("Flushing entry logger {} back to filesystem, pending
for syncing entry loggers : {}.",
- logChannel.getLogId(), rotatedLogChannels);
- for (EntryLogListener listener : listeners) {
- listener.onRotateEntryLog();
- }
- } else {
- setCurrentLogForLedgerAndAddToRotate(ledgerId,
entryLoggerAllocator.createNewLog());
- }
- }
- }
-
- class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
-
- private volatile BufferedLogChannel activeLogChannel;
- private long logIdBeforeFlush = INVALID_LID;
- private final AtomicBoolean shouldCreateNewEntryLog = new
AtomicBoolean(false);
-
- EntryLogManagerForSingleEntryLog(LedgerDirsManager ledgerDirsManager) {
- this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
- // Register listener for disk full notifications.
- ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
- }
-
- private LedgerDirsListener getLedgerDirsListener() {
- return new LedgerDirsListener() {
- @Override
- public void diskFull(File disk) {
- // If the current entry log disk is full, then create new
- // entry log.
- BufferedLogChannel currentActiveLogChannel =
activeLogChannel;
- if (currentActiveLogChannel != null
- &&
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
- shouldCreateNewEntryLog.set(true);
- }
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- // If the current entry log disk is almost full, then
create new entry
- // log.
- BufferedLogChannel currentActiveLogChannel =
activeLogChannel;
- if (currentActiveLogChannel != null
- &&
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
- shouldCreateNewEntryLog.set(true);
- }
- }
- };
- }
-
- @Override
- public synchronized long addEntry(long ledger, ByteBuf entry, boolean
rollLog) throws IOException {
- return super.addEntry(ledger, entry, rollLog);
- }
-
- @Override
- synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long
ledgerId, int entrySize,
- boolean rollLog) throws IOException {
- if (null == activeLogChannel) {
- // log channel can be null because the file is deferred to be
created
- createNewLog(UNASSIGNED_LEDGERID);
- }
-
- boolean reachEntryLogLimit = rollLog ?
reachEntryLogLimit(activeLogChannel, entrySize)
- : readEntryLogHardLimit(activeLogChannel, entrySize);
- // Create new log if logSizeLimit reached or current disk is full
- boolean createNewLog = shouldCreateNewEntryLog.get();
- if (createNewLog || reachEntryLogLimit) {
- if (activeLogChannel != null) {
- activeLogChannel.flushAndForceWriteIfRegularFlush(false);
- }
- createNewLog(UNASSIGNED_LEDGERID);
- // Reset the flag
- if (createNewLog) {
- shouldCreateNewEntryLog.set(false);
- }
- }
- return activeLogChannel;
- }
-
- @Override
- synchronized void createNewLog(long ledgerId) throws IOException {
- super.createNewLog(ledgerId);
- }
-
- @Override
- public synchronized void setCurrentLogForLedgerAndAddToRotate(long
ledgerId, BufferedLogChannel logChannel) {
- BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
- activeLogChannel = logChannel;
- if (hasToRotateLogChannel != null) {
- rotatedLogChannels.add(hasToRotateLogChannel);
- }
- }
-
- @Override
- public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
- return activeLogChannel;
- }
-
- @Override
- public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
- BufferedLogChannel activeLogChannelTemp = activeLogChannel;
- if ((activeLogChannelTemp != null) &&
(activeLogChannelTemp.getLogId() == entryLogId)) {
- return activeLogChannelTemp;
- }
- return null;
- }
-
- @Override
- public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
- Collections.shuffle(writableLedgerDirs);
- return writableLedgerDirs.get(0);
- }
-
- @Override
- public void checkpoint() throws IOException {
- flushRotatedLogs();
- }
-
- public long getCurrentLogId() {
- BufferedLogChannel currentActiveLogChannel = activeLogChannel;
- if (currentActiveLogChannel != null) {
- return currentActiveLogChannel.getLogId();
- } else {
- return EntryLogger.UNINITIALIZED_LOG_ID;
- }
- }
-
- @Override
- public void flushCurrentLogs() throws IOException {
- BufferedLogChannel currentActiveLogChannel = activeLogChannel;
- if (currentActiveLogChannel != null) {
- /**
- * flushCurrentLogs method is called during checkpoint, so
- * metadata of the file also should be force written.
- */
- flushLogChannel(currentActiveLogChannel, true);
- }
- }
-
- @Override
- void flushRotatedLogs() throws IOException {
- List<BufferedLogChannel> channels = null;
- synchronized (this) {
- channels = rotatedLogChannels;
- rotatedLogChannels = new LinkedList<BufferedLogChannel>();
- }
- if (null == channels) {
- return;
- }
- Iterator<BufferedLogChannel> chIter = channels.iterator();
- while (chIter.hasNext()) {
- BufferedLogChannel channel = chIter.next();
- try {
- channel.flushAndForceWrite(true);
- } catch (IOException ioe) {
- // rescue from flush exception, add unflushed channels back
- synchronized (this) {
- if (null == rotatedLogChannels) {
- rotatedLogChannels = channels;
- } else {
- rotatedLogChannels.addAll(0, channels);
- }
- }
- throw ioe;
- }
- // remove the channel from the list after it is successfully
flushed
- chIter.remove();
- // since this channel is only used for writing, after flushing
the channel,
- // we had to close the underlying file channel. Otherwise, we
might end up
- // leaking fds which cause the disk spaces could not be
reclaimed.
- closeFileChannel(channel);
-
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
- LOG.info("Synced entry logger {} to disk.",
channel.getLogId());
- }
- }
-
- @Override
- public void close() throws IOException {
- if (activeLogChannel != null) {
- closeFileChannel(activeLogChannel);
- }
- }
-
- @Override
- public void forceClose() {
- if (activeLogChannel != null) {
- forceCloseFileChannel(activeLogChannel);
- }
- }
-
- @Override
- public void prepareEntryMemTableFlush() {
- logIdBeforeFlush = getCurrentLogId();
- }
-
- @Override
- public boolean commitEntryMemTableFlush() throws IOException {
- long logIdAfterFlush = getCurrentLogId();
- /*
- * in any case that an entry log reaches the limit, we roll the log
- * and start checkpointing. if a memory table is flushed spanning
- * over two entry log files, we also roll log. this is for
- * performance consideration: since we don't wanna checkpoint a new
- * log file that ledger storage is writing to.
- */
- if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush !=
logIdBeforeFlush) {
- LOG.info("Rolling entry logger since it reached size
limitation");
- createNewLog(UNASSIGNED_LEDGERID);
- return true;
- }
- return false;
- }
-
- @Override
- public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed)
throws IOException{
- if (numBytesFlushed > 0) {
- // if bytes are added between previous flush and this
checkpoint,
- // it means bytes might live at current active entry log, we
need
- // roll current entry log and then issue checkpoint to
underlying
- // interleaved ledger storage.
- createNewLog(UNASSIGNED_LEDGERID);
- }
- }
- }
-
/**
* Flushes all rotated log channels. After log channels are flushed,
* move leastUnflushedLogId ptr to current logId.
@@ -1273,7 +711,7 @@ public class EntryLogger {
void createNewCompactionLog() throws IOException {
synchronized (compactionLogLock) {
if (compactionLogChannel == null) {
- compactionLogChannel =
entryLoggerAllocator.createNewLogForCompaction();
+ compactionLogChannel =
entryLogManager.createNewLogForCompaction();
}
}
}
@@ -1702,7 +1140,7 @@ public class EntryLogger {
entryLoggerAllocator.stop();
}
- private static void closeFileChannel(BufferedChannelBase channel) throws
IOException {
+ static void closeFileChannel(BufferedChannelBase channel) throws
IOException {
if (null == channel) {
return;
}
@@ -1713,7 +1151,7 @@ public class EntryLogger {
}
}
- private static void forceCloseFileChannel(BufferedChannelBase channel) {
+ static void forceCloseFileChannel(BufferedChannelBase channel) {
if (null == channel) {
return;
}
@@ -1784,4 +1222,4 @@ public class EntryLogger {
return leastUnflushedLogId;
}
}
-}
\ No newline at end of file
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
new file mode 100644
index 0000000..d33d7c4
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -0,0 +1,215 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * An allocator pre-allocates entry log files.
+ */
+@Slf4j
+class EntryLoggerAllocator {
+
+ private long preallocatedLogId;
+ private Future<BufferedLogChannel> preallocation = null;
+ private ExecutorService allocatorExecutor;
+ private final ServerConfiguration conf;
+ private final LedgerDirsManager ledgerDirsManager;
+ private final Object createEntryLogLock = new Object();
+ private final Object createCompactionLogLock = new Object();
+ private final EntryLogger.RecentEntryLogsStatus
recentlyCreatedEntryLogsStatus;
+ private final boolean entryLogPreAllocationEnabled;
+ final ByteBuf logfileHeader =
Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
+
+ EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager
ledgerDirsManager,
+ EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus,
long logId) {
+ this.conf = conf;
+ this.ledgerDirsManager = ledgerDirsManager;
+ this.preallocatedLogId = logId;
+ this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+ this.entryLogPreAllocationEnabled =
conf.isEntryLogFilePreAllocationEnabled();
+ this.allocatorExecutor = Executors.newSingleThreadExecutor();
+
+ // Initialize the entry log header buffer. This cannot be a static
object
+ // since in our unit tests, we run multiple Bookies and thus
EntryLoggers
+ // within the same JVM. All of these Bookie instances access this
header
+ // so there can be race conditions when entry logs are rolled over and
+ // this header buffer is cleared before writing it into the new
logChannel.
+ logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
+ logfileHeader.writeInt(EntryLogger.HEADER_CURRENT_VERSION);
+ logfileHeader.writerIndex(EntryLogger.LOGFILE_HEADER_SIZE);
+
+ }
+
+ synchronized long getPreallocatedLogId() {
+ return preallocatedLogId;
+ }
+
+ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws
IOException {
+ synchronized (createEntryLogLock) {
+ BufferedLogChannel bc;
+ if (!entryLogPreAllocationEnabled){
+ // create a new log directly
+ bc = allocateNewLog(dirForNextEntryLog);
+ return bc;
+ } else {
+ // allocate directly to response request
+ if (null == preallocation){
+ bc = allocateNewLog(dirForNextEntryLog);
+ } else {
+ // has a preallocated entry log
+ try {
+ bc = preallocation.get();
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof IOException) {
+ throw (IOException) (ee.getCause());
+ } else {
+ throw new IOException("Error to execute entry log
allocation.", ee);
+ }
+ } catch (CancellationException ce) {
+ throw new IOException("Task to allocate a new entry
log is cancelled.", ce);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Intrrupted when waiting a new
entry log to be allocated.", ie);
+ }
+ }
+ // preallocate a new log in background upon every call
+ preallocation = allocatorExecutor.submit(() ->
allocateNewLog(dirForNextEntryLog));
+ return bc;
+ }
+ }
+ }
+
+ BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog)
throws IOException {
+ synchronized (createCompactionLogLock) {
+ return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
+ }
+ }
+
+ private synchronized BufferedLogChannel allocateNewLog(File
dirForNextEntryLog) throws IOException {
+ return allocateNewLog(dirForNextEntryLog, ".log");
+ }
+
+ /**
+ * Allocate a new log file.
+ */
+ private synchronized BufferedLogChannel allocateNewLog(File
dirForNextEntryLog, String suffix) throws IOException {
+ List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+ String logFileName;
+ // It would better not to overwrite existing entry log files
+ File testLogFile = null;
+ do {
+ if (preallocatedLogId >= Integer.MAX_VALUE) {
+ preallocatedLogId = 0;
+ } else {
+ ++preallocatedLogId;
+ }
+ logFileName = Long.toHexString(preallocatedLogId) + suffix;
+ for (File dir : ledgersDirs) {
+ testLogFile = new File(dir, logFileName);
+ if (testLogFile.exists()) {
+ log.warn("Found existed entry log " + testLogFile
+ + " when trying to create it as a new log.");
+ testLogFile = null;
+ break;
+ }
+ }
+ } while (testLogFile == null);
+
+ File newLogFile = new File(dirForNextEntryLog, logFileName);
+ FileChannel channel = new RandomAccessFile(newLogFile,
"rw").getChannel();
+
+ BufferedLogChannel logChannel = new BufferedLogChannel(channel,
conf.getWriteBufferBytes(),
+ conf.getReadBufferBytes(), preallocatedLogId, newLogFile,
conf.getFlushIntervalInBytes());
+ logfileHeader.readerIndex(0);
+ logChannel.write(logfileHeader);
+
+ for (File f : ledgersDirs) {
+ setLastLogId(f, preallocatedLogId);
+ }
+
+ if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) {
+ recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+ }
+
+ log.info("Created new entry log file {} for logId {}.", newLogFile,
preallocatedLogId);
+ return logChannel;
+ }
+
+ /**
+ * writes the given id to the "lastId" file in the given directory.
+ */
+ private void setLastLogId(File dir, long logId) throws IOException {
+ FileOutputStream fos;
+ fos = new FileOutputStream(new File(dir, "lastId"));
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos,
UTF_8));
+ try {
+ bw.write(Long.toHexString(logId) + "\n");
+ bw.flush();
+ } catch (IOException e) {
+ log.warn("Failed write lastId file");
+ } finally {
+ try {
+ bw.close();
+ } catch (IOException e) {
+ log.error("Could not close lastId file in {}", dir.getPath());
+ }
+ }
+ }
+
+ /**
+ * Stop the allocator.
+ */
+ void stop() {
+ // wait until the preallocation finished.
+ allocatorExecutor.shutdown();
+ log.info("Stopped entry logger preallocator.");
+ }
+
+ /**
+ * get the preallocation for tests.
+ */
+ Future<BufferedLogChannel> getPreallocationFuture(){
+ return preallocation;
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 4257ccc..4c4514a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
-import
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.DiskChecker;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 99c1ad4..ac64335 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -49,9 +49,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
-import
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -855,7 +852,7 @@ public class EntryLogTest {
conf.setEntryLogPerLedgerEnabled(false);
EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
EntryLogManager newEntryLogManager =
newEntryLogger.getEntryLogManager();
- Assert.assertEquals("EntryLogManager class type",
EntryLogger.EntryLogManagerForSingleEntryLog.class,
+ Assert.assertEquals("EntryLogManager class type",
EntryLogManagerForSingleEntryLog.class,
newEntryLogManager.getClass());
ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 6cbdcde..604099c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -43,7 +43,6 @@ import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 9642c18..f3ef108 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -36,7 +36,6 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-import
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
--
To stop receiving notification emails like this one, please contact
[email protected].