sijie commented on a change in pull request #1281: Issue #570: Introducing
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r179294075
##########
File path:
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
##########
@@ -802,88 +881,230 @@ private long readLastLogId(File f) {
}
}
+ interface EntryLogManager {
+ /*
+ * acquire lock for this ledger.
+ */
+ void acquireLock(Long ledgerId);
+
+ /*
+ * acquire lock for this ledger if it is not already available for this
+ * ledger then it will create a new one and then acquire lock.
+ */
+ void acquireLockByCreatingIfRequired(Long ledgerId);
+
+ /*
+ * release lock for this ledger
+ */
+ void releaseLock(Long ledgerId);
+
+ /*
+ * sets the logChannel for the given ledgerId. The previous one will be
+ * removed from replicaOfCurrentLogChannels. Previous logChannel will
be
+ * added to rotatedLogChannels.
+ */
+ void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel
logChannel);
+
+ /*
+ * gets the logChannel for the given ledgerId.
+ */
+ BufferedLogChannel getCurrentLogForLedger(Long ledgerId);
+
+ /*
+ * gets the copy of rotatedLogChannels
+ */
+ Set<BufferedLogChannel> getCopyOfRotatedLogChannels();
+
+ /*
+ * gets the copy of replicaOfCurrentLogChannels
+ */
+ Set<BufferedLogChannel> getCopyOfCurrentLogs();
+
+ /*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+ BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+ /*
+ * removes the logChannel from rotatedLogChannels collection
+ */
+ void removeFromRotatedLogChannels(BufferedLogChannel
rotatedLogChannelToRemove);
+
+ /*
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+ File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+ /*
+ * Do the operations required for checkpoint.
+ */
+ void checkpoint() throws IOException;
+
+ /*
+ * roll entryLogs.
+ */
+ void rollLogs() throws IOException;
+
+ /*
+ * gets the log id of the current activeEntryLog. for
+ * EntryLogManagerForSingleEntryLog it returns logid of current
+ * activelog, for EntryLogManagerForEntryLogPerLedger it would return
+ * some constant value.
+ */
+ long getCurrentLogId();
+ }
+
+ class EntryLogManagerForSingleEntryLog implements EntryLogManager {
+
+ private volatile BufferedLogChannel activeLogChannel;
+ private Lock lockForActiveLogChannel;
+ private final Set<BufferedLogChannel> rotatedLogChannels;
+
+ EntryLogManagerForSingleEntryLog() {
+ rotatedLogChannels = ConcurrentHashMap.newKeySet();
+ lockForActiveLogChannel = new ReentrantLock();
+ }
+
+ /*
+ * since entryLogPerLedger is not enabled, it is just one lock for all
+ * ledgers.
+ */
+ @Override
+ public void acquireLock(Long ledgerId) {
+ lockForActiveLogChannel.lock();
+ }
+
+ @Override
+ public void acquireLockByCreatingIfRequired(Long ledgerId) {
+ acquireLock(ledgerId);
+ }
+
+ @Override
+ public void releaseLock(Long ledgerId) {
+ lockForActiveLogChannel.unlock();
+ }
+
+ @Override
+ public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel
logChannel) {
+ acquireLock(ledgerId);
+ try {
+ BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+ activeLogChannel = logChannel;
+ if (hasToRotateLogChannel != null) {
+ rotatedLogChannels.add(hasToRotateLogChannel);
+ }
+ } finally {
+ releaseLock(ledgerId);
+ }
+ }
+
+ @Override
+ public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+ return activeLogChannel;
+ }
+
+ @Override
+ public Set<BufferedLogChannel> getCopyOfRotatedLogChannels() {
+ return new HashSet<BufferedLogChannel>(rotatedLogChannels);
+ }
+
+ @Override
+ public Set<BufferedLogChannel> getCopyOfCurrentLogs() {
+ HashSet<BufferedLogChannel> copyOfCurrentLogs = new
HashSet<BufferedLogChannel>();
+ copyOfCurrentLogs.add(activeLogChannel);
+ return copyOfCurrentLogs;
+ }
+
+ @Override
+ public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+ BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+ if ((activeLogChannelTemp != null) &&
(activeLogChannelTemp.getLogId() == entryLogId)) {
+ return activeLogChannelTemp;
+ }
+ return null;
+ }
+
+ @Override
+ public void removeFromRotatedLogChannels(BufferedLogChannel
rotatedLogChannelToRemove) {
+ rotatedLogChannels.remove(rotatedLogChannelToRemove);
+ }
+
+ @Override
+ public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+ Collections.shuffle(writableLedgerDirs);
+ return writableLedgerDirs.get(0);
+ }
+
+ @Override
+ public void checkpoint() throws IOException {
+ flushRotatedLogs();
+ }
+
+ @Override
+ public void rollLogs() throws IOException {
+ createNewLog(INVALID_LEDGERID);
+ }
+
+ @Override
+ public long getCurrentLogId() {
+ return activeLogChannel.getLogId();
+ }
+ }
+
/**
* Flushes all rotated log channels. After log channels are flushed,
* move leastUnflushedLogId ptr to current logId.
*/
void checkpoint() throws IOException {
- flushRotatedLogs();
- /*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
- *
- * TODO: When EntryLogManager is introduced in the subsequent
sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based
logic.
- *
- */
- if (entryLogPerLedgerEnabled) {
- flushCurrentLog();
- }
+ entryLogManager.checkpoint();
}
void flushRotatedLogs() throws IOException {
- List<BufferedLogChannel> channels = null;
- long flushedLogId = INVALID_LID;
- synchronized (this) {
- channels = logChannelsToFlush;
- logChannelsToFlush = null;
- }
+ Set<BufferedLogChannel> channels =
entryLogManager.getCopyOfRotatedLogChannels();
if (null == channels) {
return;
}
- Iterator<BufferedLogChannel> chIter = channels.iterator();
- while (chIter.hasNext()) {
- BufferedLogChannel channel = chIter.next();
- try {
- channel.flushAndForceWrite(false);
- } catch (IOException ioe) {
- // rescue from flush exception, add unflushed channels back
- synchronized (this) {
- if (null == logChannelsToFlush) {
- logChannelsToFlush = channels;
- } else {
- logChannelsToFlush.addAll(0, channels);
- }
- }
- throw ioe;
- }
- // remove the channel from the list after it is successfully
flushed
- chIter.remove();
+ for (BufferedLogChannel channel : channels) {
+ channel.flushAndForceWrite(true);
// since this channel is only used for writing, after flushing the
channel,
// we had to close the underlying file channel. Otherwise, we
might end up
// leaking fds which cause the disk spaces could not be reclaimed.
closeFileChannel(channel);
- if (channel.getLogId() > flushedLogId) {
- flushedLogId = channel.getLogId();
- }
+
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+ entryLogManager.removeFromRotatedLogChannels(channel);
LOG.info("Synced entry logger {} to disk.", channel.getLogId());
}
- // move the leastUnflushedLogId ptr
- leastUnflushedLogId = flushedLogId + 1;
}
public void flush() throws IOException {
+ flushCurrentLogs();
Review comment:
why you change the sequence here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services