This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 82ad464 Issue #1345: entrylogger.flush should flush currentlog first.
82ad464 is described below
commit 82ad464d3b40e0f69e55556ae15ad06f29ec5003
Author: cguttapalem <[email protected]>
AuthorDate: Wed Apr 25 02:11:46 2018 -0700
Issue #1345: entrylogger.flush should flush currentlog first.
Descriptions of the changes in this PR:
It is incorrect to call flushrotatedlogs first and
then flushcurrentlogs in the EntryLogger
(EntryLogManager) flush method. It should be other way around.
Master Issue: #1345
Author: cguttapalem <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1360 from reddycharan/elmflushorder, closes #1345
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 2 +-
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 119 +++++++++++++++++++++
2 files changed, 120 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index e2a25ed..d289b0e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -947,8 +947,8 @@ public class EntryLogger {
@Override
public void flush() throws IOException {
- flushRotatedLogs();
flushCurrentLogs();
+ flushRotatedLogs();
}
void flushLogChannel(BufferedLogChannel logChannel, boolean
forceMetadata) throws IOException {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 21da951..d402a85 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -38,12 +38,17 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
import
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
@@ -435,6 +440,120 @@ public class EntryLogTest {
assertEquals(Sets.newHashSet(0L, 1L, 2L),
entryLogger.getEntryLogsSet());
}
+ /**
+ * In this testcase, entryLogger flush and entryLogger addEntry (which
would
+ * call createNewLog) are called concurrently. Since entryLogger flush
+ * method flushes both currentlog and rotatedlogs, it is expected all the
+ * currentLog and rotatedLogs are supposed to be flush and forcewritten.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFlushOrder() throws Exception {
+ entryLogger.shutdown();
+
+ int logSizeLimit = 256 * 1024;
+ conf.setEntryLogPerLedgerEnabled(false);
+ conf.setEntryLogFilePreAllocationEnabled(false);
+ conf.setFlushIntervalInBytes(0);
+ conf.setEntryLogSizeLimit(logSizeLimit);
+
+ entryLogger = new EntryLogger(conf, dirsMgr);
+ EntryLogManagerBase entryLogManager = (EntryLogManagerBase)
entryLogger.getEntryLogManager();
+ AtomicBoolean exceptionHappened = new AtomicBoolean(false);
+
+ CyclicBarrier barrier = new CyclicBarrier(2);
+ List<BufferedLogChannel> rotatedLogChannels;
+ BufferedLogChannel currentActiveChannel;
+
+ exceptionHappened.set(false);
+
+ /*
+ * higher the number of rotated logs, it would be easier to reproduce
+ * the issue regarding flush order
+ */
+ addEntriesAndRotateLogs(entryLogger, 30);
+
+ rotatedLogChannels = new
LinkedList<BufferedLogChannel>(entryLogManager.getRotatedLogChannels());
+ currentActiveChannel =
entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID);
+ long currentActiveChannelUnpersistedBytes =
currentActiveChannel.getUnpersistedBytes();
+
+ Thread flushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+ entryLogger.flush();
+ } catch (InterruptedException | BrokenBarrierException |
IOException e) {
+ LOG.error("Exception happened for entryLogger.flush", e);
+ exceptionHappened.set(true);
+ }
+ }
+ });
+
+ Thread createdNewLogThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+ /*
+ * here we are adding entry of size logSizeLimit with
+ * rolllog=true, so it would create a new entrylog.
+ */
+ entryLogger.addEntry(123, generateEntry(123, 456,
logSizeLimit), true);
+ } catch (InterruptedException | BrokenBarrierException |
IOException e) {
+ LOG.error("Exception happened for
entryLogManager.createNewLog", e);
+ exceptionHappened.set(true);
+ }
+ }
+ });
+
+ /*
+ * concurrently entryLogger flush and entryLogger addEntry (which would
+ * call createNewLog) would be called from different threads.
+ */
+ flushThread.start();
+ createdNewLogThread.start();
+ flushThread.join();
+ createdNewLogThread.join();
+
+ Assert.assertFalse("Exception happened in one of the operation",
exceptionHappened.get());
+
+ /*
+ * if flush of the previous current channel is called then the
+ * unpersistedBytes should be less than what it was before, actually it
+ * would be close to zero (but when new log is created with addEntry
+ * call, ledgers map will be appended at the end of entry log)
+ */
+ Assert.assertTrue(
+ "previous currentChannel unpersistedBytes should be less than
" + currentActiveChannelUnpersistedBytes
+ + ", but it is actually " +
currentActiveChannel.getUnpersistedBytes(),
+ currentActiveChannel.getUnpersistedBytes() <
currentActiveChannelUnpersistedBytes);
+ for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) {
+ Assert.assertEquals("previous rotated entrylog should be
flushandforcewritten", 0,
+ rotatedLogChannel.getUnpersistedBytes());
+ }
+ }
+
+ void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations)
+ throws IOException {
+ EntryLogManagerBase entryLogManager = (EntryLogManagerBase)
entryLogger.getEntryLogManager();
+
entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID,
null);
+ for (int i = 0; i < numOfRotations; i++) {
+ addEntries(entryLogger, 10);
+
entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID,
null);
+ }
+ addEntries(entryLogger, 10);
+ }
+
+ void addEntries(EntryLogger entryLogger, int noOfEntries) throws
IOException {
+ for (int j = 0; j < noOfEntries; j++) {
+ int ledgerId = Math.abs(rand.nextInt());
+ int entryId = Math.abs(rand.nextInt());
+ entryLogger.addEntry(ledgerId, generateEntry(ledgerId,
entryId).nioBuffer());
+ }
+ }
+
static class LedgerStorageWriteTask implements Callable<Boolean> {
long ledgerId;
int entryId;
--
To stop receiving notification emails like this one, please contact
[email protected].