sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180515728
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##########
 @@ -788,89 +852,340 @@ private long readLastLogId(File f) {
         }
     }
 
-    /**
-     * Flushes all rotated log channels. After log channels are flushed,
-     * move leastUnflushedLogId ptr to current logId.
-     */
-    void checkpoint() throws IOException {
-        flushRotatedLogs();
+    interface EntryLogManager {
+
+        /*
+         * add entry to the corresponding entrylog and return the position of
+         * the entry in the entrylog
+         */
+        long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+        /*
+         * gets the active logChannel with the given entryLogId. null if it is
+         * not existing.
+         */
+        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+        /*
+         * Returns eligible writable ledger dir for the creation next entrylog
+         */
+        File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+        /*
+         * Do the operations required for checkpoint.
+         */
+        void checkpoint() throws IOException;
+
+        /*
+         * roll entryLogs.
+         */
+        void rollLogs() throws IOException;
+
+        /*
+         * flush rotated logs.
+         */
+        void flushRotatedLogs() throws IOException;
+
+        /*
+         * flush current logs.
+         */
+        void flushCurrentLogs() throws IOException;
+
+        /*
+         * close current logs.
+         */
+        void closeCurrentLogs() throws IOException;
+
+        /*
+         * force close current logs.
+         */
+        void forceCloseCurrentLogs();
+
+        /*
+         * this method should be called before doing entrymemtable flush, it
+         * would save the state of the entrylogger before entrymemtable flush
+         * and commitEntryMemTableFlush would take appropriate action after
+         * entrymemtable flush.
+         */
+        void prepareEntryMemTableFlush();
+
         /*
-         * In the case of entryLogPerLedgerEnabled we need to flush both
-         * rotatedlogs and currentlogs. This is needed because syncThread
-         * periodically does checkpoint and at this time all the logs should
-         * be flushed.
+         * this method should be called after doing entrymemtable flush,it 
would
+         * take appropriate action after entrymemtable flush depending on the
+         * current state of the entrylogger and the state of the entrylogger
+         * during prepareEntryMemTableFlush.
          *
-         * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
-         * this Issue, I will move this logic to individual implamentations of
-         * EntryLogManager and it would be free of this booalen flag based 
logic.
+         * It is assumed that there would be corresponding
+         * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+         * would be called from the same thread.
          *
+         * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+         * after this commit method.
          */
-        if (entryLogPerLedgerEnabled) {
-            flushCurrentLog();
-        }
+        boolean commitEntryMemTableFlush() throws IOException;
     }
 
-    void flushRotatedLogs() throws IOException {
-        List<BufferedLogChannel> channels = null;
-        long flushedLogId = INVALID_LID;
-        synchronized (this) {
-            channels = logChannelsToFlush;
-            logChannelsToFlush = null;
+    abstract class EntryLogManagerBase implements EntryLogManager {
+        final Set<BufferedLogChannel> rotatedLogChannels;
+
+        EntryLogManagerBase() {
+            rotatedLogChannels = ConcurrentHashMap.newKeySet();
         }
-        if (null == channels) {
-            return;
+
+        /*
+         * This method should be guarded by a lock, so callers of this method
+         * should be in the right scope of the lock.
+         */
+        @Override
+        public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+
+            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+            BufferedLogChannel logChannel = getCurrentLogForLedger(ledger);
+            boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+                    : readEntryLogHardLimit(logChannel, entrySize);
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean diskFull = (logChannel == null) ? false
+                    : 
ledgerDirsManager.isDirFull(logChannel.getLogFile().getParentFile());
+            boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+            /**
+             * if disk of the logChannel is full or if the entrylog limit is
+             * reached of if the logchannel is not initialized, then
+             * createNewLog. If allDisks are full then proceed with the current
+             * logChannel, since Bookie must have turned to readonly mode and
+             * the addEntry traffic would be from GC and it is ok to proceed in
+             * this case.
+             */
+            if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || 
(logChannel == null)) {
+                flushCurrentLog(logChannel, false);
+                createNewLog(ledger);
+            }
+
+            logChannel = getCurrentLogForLedger(ledger);
+            ByteBuf sizeBuffer = EntryLogger.this.sizeBuffer.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            logChannel.write(sizeBuffer);
+
+            long pos = logChannel.position();
+            logChannel.write(entry);
+            logChannel.registerWrittenEntry(ledger, entrySize);
+
+            return (logChannel.getLogId() << 32L) | pos;
         }
-        Iterator<BufferedLogChannel> chIter = channels.iterator();
-        while (chIter.hasNext()) {
-            BufferedLogChannel channel = chIter.next();
-            try {
-                channel.flushAndForceWrite(false);
-            } catch (IOException ioe) {
-                // rescue from flush exception, add unflushed channels back
-                synchronized (this) {
-                    if (null == logChannelsToFlush) {
-                        logChannelsToFlush = channels;
-                    } else {
-                        logChannelsToFlush.addAll(0, channels);
-                    }
+
+        boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > logSizeLimit;
+        }
+
+        boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long 
size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > Integer.MAX_VALUE;
+        }
+
+        abstract BufferedLogChannel getCurrentLogForLedger(Long ledgerId);
+
+        abstract void setCurrentLogForLedgerAndAddToRotate(Long ledgerId, 
BufferedLogChannel logChannel);
+
+        public Set<BufferedLogChannel> getCopyOfRotatedLogChannels() {
+            return new HashSet<BufferedLogChannel>(rotatedLogChannels);
+        }
+
+        @Override
+        public void flushRotatedLogs() throws IOException {
+            Set<BufferedLogChannel> channels = getCopyOfRotatedLogChannels();
+            for (BufferedLogChannel channel : channels) {
+                channel.flushAndForceWrite(true);
+                // since this channel is only used for writing, after flushing 
the channel,
+                // we had to close the underlying file channel. Otherwise, we 
might end up
+                // leaking fds which cause the disk spaces could not be 
reclaimed.
+                closeFileChannel(channel);
+                
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+                rotatedLogChannels.remove(channel);
+                LOG.info("Synced entry logger {} to disk.", 
channel.getLogId());
+            }
+        }
+
+        void flushCurrentLog(BufferedLogChannel logChannel, boolean 
forceMetadata) throws IOException {
+            if (logChannel != null) {
+                logChannel.flushAndForceWrite(forceMetadata);
+                LOG.debug("Flush and sync current entry logger {}", 
logChannel.getLogId());
+            }
+        }
+
+        /*
+         * Creates a new log file. This method should be guarded by a lock,
+         * so callers of this method should be in right scope of the lock.
+         */
+        void createNewLog(Long ledgerId) throws IOException {
+            BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+            // first tried to create a new log channel. add current log 
channel to ToFlush list only when
+            // there is a new log channel. it would prevent that a log channel 
is referenced by both
+            // *logChannel* and *ToFlush* list.
+            if (null != logChannel) {
+
+                // flush the internal buffer back to filesystem but not sync 
disk
+                logChannel.flush();
+
+                // Append ledgers map at the end of entry log
+                logChannel.appendLedgersMap();
+
+                BufferedLogChannel newLogChannel = 
entryLoggerAllocator.createNewLog();
+                setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+                LOG.info("Flushing entry logger {} back to filesystem, pending 
for syncing entry loggers : {}.",
+                        logChannel.getLogId(), getCopyOfRotatedLogChannels());
+                for (EntryLogListener listener : listeners) {
+                    listener.onRotateEntryLog();
                 }
-                throw ioe;
+            } else {
+                setCurrentLogForLedgerAndAddToRotate(ledgerId, 
entryLoggerAllocator.createNewLog());
             }
-            // remove the channel from the list after it is successfully 
flushed
-            chIter.remove();
-            // since this channel is only used for writing, after flushing the 
channel,
-            // we had to close the underlying file channel. Otherwise, we 
might end up
-            // leaking fds which cause the disk spaces could not be reclaimed.
-            closeFileChannel(channel);
-            if (channel.getLogId() > flushedLogId) {
-                flushedLogId = channel.getLogId();
+        }
+    }
+
+    class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+        private volatile BufferedLogChannel activeLogChannel;
+        private long logIdBeforeFlush = INVALID_LID;
+
+        @Override
+        public synchronized long addEntry(Long ledger, ByteBuf entry, boolean 
rollLog) throws IOException {
+            return super.addEntry(ledger, entry, rollLog);
+        }
+
+        @Override
+        synchronized void createNewLog(Long ledgerId) throws IOException {
+            super.createNewLog(ledgerId);
+        }
+
+        @Override
+        public synchronized void setCurrentLogForLedgerAndAddToRotate(Long 
ledgerId, BufferedLogChannel logChannel) {
+            BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+            activeLogChannel = logChannel;
+            if (hasToRotateLogChannel != null) {
+                rotatedLogChannels.add(hasToRotateLogChannel);
+            }
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+            return activeLogChannel;
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+            if ((activeLogChannelTemp != null) && 
(activeLogChannelTemp.getLogId() == entryLogId)) {
+                return activeLogChannelTemp;
             }
-            LOG.info("Synced entry logger {} to disk.", channel.getLogId());
+            return null;
+        }
+
+        @Override
+        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+            Collections.shuffle(writableLedgerDirs);
+            return writableLedgerDirs.get(0);
+        }
+
+        @Override
+        public void checkpoint() throws IOException {
+            flushRotatedLogs();
+        }
+
+        @Override
+        public synchronized void rollLogs() throws IOException {
+            createNewLog(UNASSIGNED_LEDGERID);
+        }
+
+        public long getCurrentLogId() {
+            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+            if (currentActiveLogChannel != null) {
+                return currentActiveLogChannel.getLogId();
+            } else {
+                return EntryLogger.UNINITIALIZED_LOG_ID;
+            }
+        }
+
+        @Override
+        public void flushCurrentLogs() throws IOException {
+            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+            if (currentActiveLogChannel != null) {
+                /**
+                 * flushCurrentLogs method is called during checkpoint, so
+                 * metadata of the file also should be force written.
+                 */
+                flushCurrentLog(currentActiveLogChannel, true);
+            }
+        }
+
+        @Override
+        public void closeCurrentLogs() throws IOException {
+            if (activeLogChannel != null) {
+                closeFileChannel(activeLogChannel);
+            }
+        }
+
+        @Override
+        public void forceCloseCurrentLogs() {
+            if (activeLogChannel != null) {
+                forceCloseFileChannel(activeLogChannel);
+            }
+        }
+
+        @Override
+        public void prepareEntryMemTableFlush() {
+            logIdBeforeFlush = getCurrentLogId();
+        }
+
+        @Override
+        public boolean commitEntryMemTableFlush() throws IOException {
+            long logIdAfterFlush = getCurrentLogId();
+            /*
+             * in any case that an entry log reaches the limit, we roll the log
+             * and start checkpointing. if a memory table is flushed spanning
+             * over two entry log files, we also roll log. this is for
+             * performance consideration: since we don't wanna checkpoint a new
+             * log file that ledger storage is writing to.
+             */
+            if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != 
logIdBeforeFlush) {
+                LOG.info("Rolling entry logger since it reached size 
limitation");
+                rollLogs();
+                return true;
+            }
+            return false;
         }
-        // move the leastUnflushedLogId ptr
-        leastUnflushedLogId = flushedLogId + 1;
     }
 
+    /**
+     * Flushes all rotated log channels. After log channels are flushed,
+     * move leastUnflushedLogId ptr to current logId.
+     */
+    void checkpoint() throws IOException {
+        entryLogManager.checkpoint();
+    }
+
+
+
     public void flush() throws IOException {
-        flushRotatedLogs();
-        flushCurrentLog();
+        entryLogManager.flushCurrentLogs();
 
 Review comment:
   for single-log manager, it is better to flush `rotated logs` before flushing 
`current logs`, because the way how it rotates files and do the checkpoint. I 
would prefer keeping existing logic unchanged to reduce the risks.
   
   besides that, why not provide `flush` method in EntryLogManager? so 
different entry log manager implementation can implement its own flushing logic 
rather than exposing this knowledge to EntryLogger. If you provide `flush` 
method, it is possible that you don't need to expose `flushCurrentLogs` and 
`flushRotatedLogs`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to