reddycharan commented on a change in pull request #1281: Issue #570: 
Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r181623201
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##########
 @@ -788,89 +810,392 @@ private long readLastLogId(File f) {
         }
     }
 
-    /**
-     * Flushes all rotated log channels. After log channels are flushed,
-     * move leastUnflushedLogId ptr to current logId.
-     */
-    void checkpoint() throws IOException {
-        flushRotatedLogs();
+    interface EntryLogManager {
+
+        /*
+         * add entry to the corresponding entrylog and return the position of
+         * the entry in the entrylog
+         */
+        long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+        /*
+         * gets the active logChannel with the given entryLogId. null if it is
+         * not existing.
+         */
+        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+        /*
+         * Returns eligible writable ledger dir for the creation next entrylog
+         */
+        File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+        /*
+         * Do the operations required for checkpoint.
+         */
+        void checkpoint() throws IOException;
+
+        /*
+         * flush both current and rotated logs.
+         */
+        void flush() throws IOException;
+
+        /*
+         * close current logs.
+         */
+        void close() throws IOException;
+
+        /*
+         * force close current logs.
+         */
+        void forceClose();
+
         /*
-         * In the case of entryLogPerLedgerEnabled we need to flush both
-         * rotatedlogs and currentlogs. This is needed because syncThread
-         * periodically does checkpoint and at this time all the logs should
-         * be flushed.
          *
-         * TODO: When EntryLogManager is introduced in the subsequent 
sub-tasks of
-         * this Issue, I will move this logic to individual implamentations of
-         * EntryLogManager and it would be free of this booalen flag based 
logic.
+         */
+        void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+        /*
+         * this method should be called before doing entrymemtable flush, it
+         * would save the state of the entrylogger before entrymemtable flush
+         * and commitEntryMemTableFlush would take appropriate action after
+         * entrymemtable flush.
+         */
+        void prepareEntryMemTableFlush();
+
+        /*
+         * this method should be called after doing entrymemtable flush,it 
would
+         * take appropriate action after entrymemtable flush depending on the
+         * current state of the entrylogger and the state of the entrylogger
+         * during prepareEntryMemTableFlush.
+         *
+         * It is assumed that there would be corresponding
+         * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
+         * would be called from the same thread.
          *
+         * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+         * after this commit method.
          */
-        if (entryLogPerLedgerEnabled) {
-            flushCurrentLog();
-        }
+        boolean commitEntryMemTableFlush() throws IOException;
     }
 
-    void flushRotatedLogs() throws IOException {
-        List<BufferedLogChannel> channels = null;
-        long flushedLogId = INVALID_LID;
-        synchronized (this) {
-            channels = logChannelsToFlush;
-            logChannelsToFlush = null;
+    abstract class EntryLogManagerBase implements EntryLogManager {
+        final List<BufferedLogChannel> rotatedLogChannels;
+
+        EntryLogManagerBase() {
+            rotatedLogChannels = new 
CopyOnWriteArrayList<BufferedLogChannel>();
         }
-        if (null == channels) {
-            return;
+
+        private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new 
FastThreadLocal<ByteBuf>() {
+            @Override
+            protected ByteBuf initialValue() throws Exception {
+                return Unpooled.buffer(4);
+            }
+        };
+
+        /*
+         * This method should be guarded by a lock, so callers of this method
+         * should be in the right scope of the lock.
+         */
+        @Override
+        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+            BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+            ByteBuf sizeBuffer = sizeBufferForAdd.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            logChannel.write(sizeBuffer);
+
+            long pos = logChannel.position();
+            logChannel.write(entry);
+            logChannel.registerWrittenEntry(ledger, entrySize);
+
+            return (logChannel.getLogId() << 32L) | pos;
         }
-        Iterator<BufferedLogChannel> chIter = channels.iterator();
-        while (chIter.hasNext()) {
-            BufferedLogChannel channel = chIter.next();
-            try {
-                channel.flushAndForceWrite(false);
-            } catch (IOException ioe) {
-                // rescue from flush exception, add unflushed channels back
-                synchronized (this) {
-                    if (null == logChannelsToFlush) {
-                        logChannelsToFlush = channels;
-                    } else {
-                        logChannelsToFlush.addAll(0, channels);
+
+        boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > logSizeLimit;
+        }
+
+        boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long 
size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > Integer.MAX_VALUE;
+        }
+
+        abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+        abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long 
ledgerId, int entrySize, boolean rollLog)
+                throws IOException;
+
+        abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, 
BufferedLogChannel logChannel);
+
+        /*
+         * flush current logs.
+         */
+        abstract void flushCurrentLogs() throws IOException;
+
+        List<BufferedLogChannel> getRotatedLogChannels() {
+            return rotatedLogChannels;
+        }
+
+        @Override
+        public void flush() throws IOException {
 
 Review comment:
   we missed discussing about this (order of flush in flush method)
   
   i think it is incorrect to call flushrotatedlogs first and then 
flushcurrentlogs. Consider this scenario - if in between flushRotatedLogs, 
flushCurrentLogs calls current activelog is rotated and added to rotatedlogs 
list then we would miss flushing that log in this current flush call. So it has 
to be first flushCurrentLogs and then flushRotatedLogs.
   
   even in the case of 'volatile List<BufferedLogChannel>' also i dont think it 
is correct. For the following reason - 
   
   1) lets say flush method is called 
   2) now as per the existing implementation if flushRotatedLogs is called 
first then lets after the execution of the following synchronized block (where 
'logChannelsToFlush' is assigned to 'channels' and null is assigned to 
'logChannelsToFlush'), lets say current active entrylog is rotated in another 
thread (because of reaching the size limit) then the newly created active 
entrylog will be added to new 'logChannelsToFlush' list. And when 
flushCurrentLogs is called it would flush the current active entrylog but not 
the one which is just rotated. So we missed flushing the just rotated log in 
this flush() call
   
   
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java#L816
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to