This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 82ad464  Issue #1345: entrylogger.flush should flush currentlog first.
82ad464 is described below

commit 82ad464d3b40e0f69e55556ae15ad06f29ec5003
Author: cguttapalem <[email protected]>
AuthorDate: Wed Apr 25 02:11:46 2018 -0700

    Issue #1345: entrylogger.flush should flush currentlog first.
    
    Descriptions of the changes in this PR:
    
    It is incorrect to call flushrotatedlogs first and
    then flushcurrentlogs in the EntryLogger
    (EntryLogManager) flush method. It should be other way around.
    
    Master Issue: #1345
    
    Author: cguttapalem <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1360 from reddycharan/elmflushorder, closes #1345
---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |   2 +-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 119 +++++++++++++++++++++
 2 files changed, 120 insertions(+), 1 deletion(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index e2a25ed..d289b0e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -947,8 +947,8 @@ public class EntryLogger {
 
         @Override
         public void flush() throws IOException {
-            flushRotatedLogs();
             flushCurrentLogs();
+            flushRotatedLogs();
         }
 
         void flushLogChannel(BufferedLogChannel logChannel, boolean 
forceMetadata) throws IOException {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 21da951..d402a85 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -38,12 +38,17 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
 import 
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
@@ -435,6 +440,120 @@ public class EntryLogTest {
         assertEquals(Sets.newHashSet(0L, 1L, 2L), 
entryLogger.getEntryLogsSet());
     }
 
+    /**
+     * In this testcase, entryLogger flush and entryLogger addEntry (which 
would
+     * call createNewLog) are called concurrently. Since entryLogger flush
+     * method flushes both currentlog and rotatedlogs, it is expected all the
+     * currentLog and rotatedLogs are supposed to be flush and forcewritten.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFlushOrder() throws Exception {
+        entryLogger.shutdown();
+
+        int logSizeLimit = 256 * 1024;
+        conf.setEntryLogPerLedgerEnabled(false);
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setFlushIntervalInBytes(0);
+        conf.setEntryLogSizeLimit(logSizeLimit);
+
+        entryLogger = new EntryLogger(conf, dirsMgr);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        AtomicBoolean exceptionHappened = new AtomicBoolean(false);
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        List<BufferedLogChannel> rotatedLogChannels;
+        BufferedLogChannel currentActiveChannel;
+
+        exceptionHappened.set(false);
+
+        /*
+         * higher the number of rotated logs, it would be easier to reproduce
+         * the issue regarding flush order
+         */
+        addEntriesAndRotateLogs(entryLogger, 30);
+
+        rotatedLogChannels = new 
LinkedList<BufferedLogChannel>(entryLogManager.getRotatedLogChannels());
+        currentActiveChannel = 
entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID);
+        long currentActiveChannelUnpersistedBytes = 
currentActiveChannel.getUnpersistedBytes();
+
+        Thread flushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    barrier.await();
+                    entryLogger.flush();
+                } catch (InterruptedException | BrokenBarrierException | 
IOException e) {
+                    LOG.error("Exception happened for entryLogger.flush", e);
+                    exceptionHappened.set(true);
+                }
+            }
+        });
+
+        Thread createdNewLogThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    barrier.await();
+                    /*
+                     * here we are adding entry of size logSizeLimit with
+                     * rolllog=true, so it would create a new entrylog.
+                     */
+                    entryLogger.addEntry(123, generateEntry(123, 456, 
logSizeLimit), true);
+                } catch (InterruptedException | BrokenBarrierException | 
IOException e) {
+                    LOG.error("Exception happened for 
entryLogManager.createNewLog", e);
+                    exceptionHappened.set(true);
+                }
+            }
+        });
+
+        /*
+         * concurrently entryLogger flush and entryLogger addEntry (which would
+         * call createNewLog) would be called from different threads.
+         */
+        flushThread.start();
+        createdNewLogThread.start();
+        flushThread.join();
+        createdNewLogThread.join();
+
+        Assert.assertFalse("Exception happened in one of the operation", 
exceptionHappened.get());
+
+        /*
+         * if flush of the previous current channel is called then the
+         * unpersistedBytes should be less than what it was before, actually it
+         * would be close to zero (but when new log is created with addEntry
+         * call, ledgers map will be appended at the end of entry log)
+         */
+        Assert.assertTrue(
+                "previous currentChannel unpersistedBytes should be less than 
" + currentActiveChannelUnpersistedBytes
+                        + ", but it is actually " + 
currentActiveChannel.getUnpersistedBytes(),
+                currentActiveChannel.getUnpersistedBytes() < 
currentActiveChannelUnpersistedBytes);
+        for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) {
+            Assert.assertEquals("previous rotated entrylog should be 
flushandforcewritten", 0,
+                    rotatedLogChannel.getUnpersistedBytes());
+        }
+    }
+
+    void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations)
+            throws IOException {
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        
entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID,
 null);
+        for (int i = 0; i < numOfRotations; i++) {
+            addEntries(entryLogger, 10);
+            
entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID,
 null);
+        }
+        addEntries(entryLogger, 10);
+    }
+
+    void addEntries(EntryLogger entryLogger, int noOfEntries) throws 
IOException {
+        for (int j = 0; j < noOfEntries; j++) {
+            int ledgerId = Math.abs(rand.nextInt());
+            int entryId = Math.abs(rand.nextInt());
+            entryLogger.addEntry(ledgerId, generateEntry(ledgerId, 
entryId).nioBuffer());
+        }
+    }
+
     static class LedgerStorageWriteTask implements Callable<Boolean> {
         long ledgerId;
         int entryId;

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to