This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a3c141e Issue #570: Introduce EntryMemTableWithParallelFlusher
a3c141e is described below
commit a3c141eee827c583828251756d4e52a70398b7af
Author: cguttapalem <[email protected]>
AuthorDate: Fri Apr 27 16:33:49 2018 -0700
Issue #570: Introduce EntryMemTableWithParallelFlusher
Descriptions of the changes in this PR:
This is < sub-task4 > of Issue #570
When there is going to be entrylog per ledger, then for
better utilizing the availability of multiple entrylogs,
parallel flush of entrymemtable can be done. In this
sub-task EntryMemTableWithParallelFlusher is introduced,
which uses OrderedExecutor with "numOfMemtableFlushThreads"
number of threads.
Master Issue: #570
Author: cguttapalem <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1358 from reddycharan/memtableparallel, closes #570
---
.../apache/bookkeeper/bookie/EntryMemTable.java | 21 ++-
.../bookie/EntryMemTableWithParallelFlusher.java | 154 +++++++++++++++++++
.../bookkeeper/bookie/SortedLedgerStorage.java | 11 +-
.../bookkeeper/conf/ServerConfiguration.java | 21 +++
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 50 ++++++-
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 164 +++++++++++++++++++--
.../bookkeeper/bookie/TestEntryMemTable.java | 43 +++++-
conf/bk_server.conf | 3 +
site/_data/config/bk_server.yaml | 4 +
9 files changed, 442 insertions(+), 29 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 50e93a3..70f437c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
* We continue to serve edits out of new EntrySkipList and backing snapshot
until
* flusher reports in that the flush succeeded. At that point we let the
snapshot go.
*/
-public class EntryMemTable {
+public class EntryMemTable implements AutoCloseable{
private static Logger logger = LoggerFactory.getLogger(Journal.class);
/**
@@ -117,7 +117,7 @@ public class EntryMemTable {
private final OpStatsLogger snapshotStats;
private final OpStatsLogger putEntryStats;
private final OpStatsLogger getEntryStats;
- private final Counter flushBytesCounter;
+ final Counter flushBytesCounter;
private final Counter throttlingCounter;
/**
@@ -232,10 +232,14 @@ public class EntryMemTable {
}
/**
- * Flush snapshot and clear it iff its data is before checkpoint.
- * Only this function change non-empty this.snapshot.
+ * Flush snapshot and clear it iff its data is before checkpoint. Only this
+ * function change non-empty this.snapshot.
+ *
+ * <p>EntryMemTableWithParallelFlusher overrides this flushSnapshot
method. So
+ * any change in functionality/behavior/characteristic of this method
should
+ * also reflect in EntryMemTableWithParallelFlusher's flushSnapshot method.
*/
- private long flushSnapshot(final SkipListFlusher flusher, Checkpoint
checkpoint) throws IOException {
+ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint)
throws IOException {
long size = 0;
if (this.snapshot.compareTo(checkpoint) < 0) {
long ledger, ledgerGC = -1;
@@ -268,7 +272,7 @@ public class EntryMemTable {
* @param keyValues The snapshot to clean out.
* @see {@link #snapshot()}
*/
- private void clearSnapshot(final EntrySkipList keyValues) {
+ void clearSnapshot(final EntrySkipList keyValues) {
// Caller makes sure that keyValues not empty
assert !keyValues.isEmpty();
this.lock.writeLock().lock();
@@ -452,4 +456,9 @@ public class EntryMemTable {
boolean isEmpty() {
return size.get() == 0 && snapshot.isEmpty();
}
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
new file mode 100644
index 0000000..a3849e9
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.SafeRunnable;
+
+/**
+ * EntryMemTableWithParallelFlusher.
+ */
+@Slf4j
+class EntryMemTableWithParallelFlusher extends EntryMemTable {
+
+ final OrderedExecutor flushExecutor;
+
+ public EntryMemTableWithParallelFlusher(final ServerConfiguration conf,
final CheckpointSource source,
+ final StatsLogger statsLogger) {
+ super(conf, source, statsLogger);
+ this.flushExecutor =
OrderedExecutor.newBuilder().numThreads(conf.getNumOfMemtableFlushThreads())
+ .name("MemtableFlushThreads").build();
+ }
+
+ /**
+ * Functionally this overridden flushSnapshot does the same as
+ * EntryMemTable's flushSnapshot, but it uses flushExecutor
+ * (OrderedExecutor) to process an entry through flusher.
+ *
+ * <p>SubMaps of the snapshot corresponding to the entries of the ledgers
are
+ * created and submitted to the flushExecutor with ledgerId as the
+ * orderingKey to flush process the entries of a ledger.
+ */
+ @Override
+ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint)
throws IOException {
+ AtomicLong flushedSize = new AtomicLong();
+ if (this.snapshot.compareTo(checkpoint) < 0) {
+ synchronized (this) {
+ EntrySkipList keyValues = this.snapshot;
+
+ Phaser pendingNumOfLedgerFlushes = new Phaser(1);
+ AtomicReference<Exception> exceptionWhileFlushingParallelly =
new AtomicReference<Exception>();
+
+ if (keyValues.compareTo(checkpoint) < 0) {
+
+ Map.Entry<EntryKey, EntryKeyValue> thisLedgerFirstMapEntry
= keyValues.firstEntry();
+ EntryKeyValue thisLedgerFirstEntry;
+ long thisLedgerId;
+
+ while (thisLedgerFirstMapEntry != null) {
+ thisLedgerFirstEntry =
thisLedgerFirstMapEntry.getValue();
+ thisLedgerId = thisLedgerFirstEntry.getLedgerId();
+ EntryKey thisLedgerCeilingKeyMarker = new
EntryKey(thisLedgerId, Long.MAX_VALUE - 1);
+ /*
+ * Gets a view of the portion of this map that
+ * corresponds to entries of this ledger.
+ */
+ ConcurrentNavigableMap<EntryKey, EntryKeyValue>
thisLedgerEntries = keyValues
+ .subMap(thisLedgerFirstEntry,
thisLedgerCeilingKeyMarker);
+ pendingNumOfLedgerFlushes.register();
+ flushExecutor.executeOrdered(thisLedgerId, new
SafeRunnable() {
+ @Override
+ public void safeRun() {
+ try {
+ long ledger;
+ boolean ledgerDeleted = false;
+ for (EntryKey key :
thisLedgerEntries.keySet()) {
+ EntryKeyValue kv = (EntryKeyValue) key;
+ flushedSize.addAndGet(kv.getLength());
+ ledger = kv.getLedgerId();
+ if (!ledgerDeleted) {
+ try {
+ flusher.process(ledger,
kv.getEntryId(), kv.getValueAsByteBuffer());
+ } catch (NoLedgerException
exception) {
+ ledgerDeleted = true;
+ }
+ }
+ }
+
pendingNumOfLedgerFlushes.arriveAndDeregister();
+ } catch (Exception exc) {
+ log.error("Got Exception while trying to
flush process entryies: ", exc);
+ exceptionWhileFlushingParallelly.set(exc);
+ /*
+ * if we get any unexpected exception while
+ * trying to flush process entries of a
+ * ledger, then terminate the
+ * pendingNumOfLedgerFlushes phaser.
+ */
+
pendingNumOfLedgerFlushes.forceTermination();
+ }
+ }
+ });
+ thisLedgerFirstMapEntry =
keyValues.ceilingEntry(thisLedgerCeilingKeyMarker);
+ }
+
+ boolean phaserTerminatedAbruptly = false;
+ try {
+ /*
+ * while flush processing entries of a ledger if it
+ * failed because of any unexpected exception then
+ * pendingNumOfLedgerFlushes phaser would be force
+ * terminated and because of that arriveAndAwaitAdvance
+ * would be a negative value.
+ */
+ phaserTerminatedAbruptly =
(pendingNumOfLedgerFlushes.arriveAndAwaitAdvance() < 0);
+ } catch (IllegalStateException ise) {
+ log.error("Got IllegalStateException while awaiting on
Phaser", ise);
+ throw new IOException("Got IllegalStateException while
awaiting on Phaser", ise);
+ }
+ if (phaserTerminatedAbruptly) {
+ log.error("Phaser is terminated while awaiting
flushExecutor to complete the entry flushes",
+ exceptionWhileFlushingParallelly.get());
+ throw new IOException("Failed to complete the
flushSnapshotByParallelizing",
+ exceptionWhileFlushingParallelly.get());
+ }
+ flushBytesCounter.add(flushedSize.get());
+ clearSnapshot(keyValues);
+ }
+ }
+ }
+ return flushedSize.longValue();
+ }
+
+ @Override
+ public void close() throws Exception {
+ flushExecutor.shutdown();
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index f2efa55..815c65e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -72,7 +72,11 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
checkpointSource,
checkpointer,
statsLogger);
- this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
+ if (conf.isEntryLogPerLedgerEnabled()) {
+ this.memTable = new EntryMemTableWithParallelFlusher(conf,
checkpointSource, statsLogger);
+ } else {
+ this.memTable = new EntryMemTable(conf, checkpointSource,
statsLogger);
+ }
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("SortedLedgerStorage-%d")
@@ -102,6 +106,11 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
+ try {
+ memTable.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing the memtable", e);
+ }
super.shutdown();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 785b35a..bdc26e7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -185,6 +185,9 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
* config specifying if the entrylog per ledger is enabled or not.
*/
protected static final String ENTRY_LOG_PER_LEDGER_ENABLED =
"entryLogPerLedgerEnabled";
+ // In the case of multipleentrylogs, multiple threads can be used to flush
the memtable parallelly.
+ protected static final String NUMBER_OF_MEMTABLE_FLUSH_THREADS =
"numOfMemtableFlushThreads";
+
/**
* Construct a default configuration object.
@@ -2723,4 +2726,22 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
this.setProperty(ENTRY_LOG_PER_LEDGER_ENABLED,
Boolean.toString(entryLogPerLedgerEnabled));
return this;
}
+
+ /*
+ * In the case of multipleentrylogs, multiple threads can be used to flush
the memtable.
+ *
+ * Gets the number of threads used to flush entrymemtable
+ */
+ public int getNumOfMemtableFlushThreads() {
+ return this.getInt(NUMBER_OF_MEMTABLE_FLUSH_THREADS, 8);
+ }
+
+ /*
+ * Sets the number of threads used to flush entrymemtable, in the case of
multiple entrylogs
+ *
+ */
+ public ServerConfiguration setNumOfMemtableFlushThreads(int
numOfMemtableFlushThreads) {
+ this.setProperty(NUMBER_OF_MEMTABLE_FLUSH_THREADS,
Integer.toString(numOfMemtableFlushThreads));
+ return this;
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index d402a85..99c1ad4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -629,20 +629,64 @@ public class EntryLogTest {
}
/**
- * test concurrent write operations and then concurrent read
- * operations using InterleavedLedgerStorage.
+ * test concurrent write operations and then concurrent read operations
+ * using InterleavedLedgerStorage.
*/
@Test
public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage()
throws Exception {
+
testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(),
false);
+ }
+
+ /**
+ * test concurrent write operations and then concurrent read operations
+ * using InterleavedLedgerStorage with EntryLogPerLedger enabled.
+ */
+ @Test
+ public void
testConcurrentWriteAndReadCallsOfInterleavedLedgerStorageWithELPLEnabled()
throws Exception {
+
testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), true);
+ }
+
+ /**
+ * test concurrent write operations and then concurrent read operations
+ * using SortedLedgerStorage.
+ */
+ @Test
+ public void testConcurrentWriteAndReadCallsOfSortedLedgerStorage() throws
Exception {
+ testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(),
false);
+ }
+
+ /**
+ * test concurrent write operations and then concurrent read operations
+ * using SortedLedgerStorage with EntryLogPerLedger enabled.
+ */
+ @Test
+ public void
testConcurrentWriteAndReadCallsOfSortedLedgerStorageWithELPLEnabled() throws
Exception {
+ testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(),
true);
+ }
+
+ public void testConcurrentWriteAndReadCalls(String ledgerStorageClass,
boolean entryLogPerLedgerEnabled)
+ throws Exception {
File ledgerDir = createTempDir("bkTest", ".dir");
ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(ledgerDir.toString());
conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
- conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+ conf.setLedgerStorageClass(ledgerStorageClass);
+ conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled);
Bookie bookie = new Bookie(conf);
InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage)
bookie.ledgerStorage);
Random rand = new Random(0);
+ if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) {
+ Assert.assertEquals("LedgerStorage Class",
SortedLedgerStorage.class, ledgerStorage.getClass());
+ if (entryLogPerLedgerEnabled) {
+ Assert.assertEquals("MemTable Class",
EntryMemTableWithParallelFlusher.class,
+ ((SortedLedgerStorage)
ledgerStorage).memTable.getClass());
+ } else {
+ Assert.assertEquals("MemTable Class", EntryMemTable.class,
+ ((SortedLedgerStorage)
ledgerStorage).memTable.getClass());
+ }
+ }
+
int numOfLedgers = 70;
int numEntries = 1500;
// Create ledgers
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 1c51209..ae3b4cd 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -37,7 +37,10 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -451,19 +454,32 @@ public class LedgerCacheTest {
static class FlushTestSortedLedgerStorage extends SortedLedgerStorage {
final AtomicBoolean injectMemTableSizeLimitReached;
final AtomicBoolean injectFlushException;
+ final AtomicLong injectFlushExceptionForLedger;
+ final AtomicInteger numOfTimesFlushSnapshotCalled = new
AtomicInteger(0);
+ static final long FORALLLEDGERS = -1;
public FlushTestSortedLedgerStorage() {
super();
injectMemTableSizeLimitReached = new AtomicBoolean();
injectFlushException = new AtomicBoolean();
+ injectFlushExceptionForLedger = new AtomicLong(FORALLLEDGERS);
}
public void setInjectMemTableSizeLimitReached(boolean setValue) {
injectMemTableSizeLimitReached.set(setValue);
}
- public void setInjectFlushException(boolean setValue) {
+ public void setInjectFlushException(boolean setValue, long ledgerId) {
injectFlushException.set(setValue);
+ injectFlushExceptionForLedger.set(ledgerId);
+ }
+
+ public void incrementNumOfTimesFlushSnapshotCalled() {
+ numOfTimesFlushSnapshotCalled.incrementAndGet();
+ }
+
+ public int getNumOfTimesFlushSnapshotCalled() {
+ return numOfTimesFlushSnapshotCalled.get();
}
@Override
@@ -484,21 +500,44 @@ public class LedgerCacheTest {
checkpointSource,
checkpointer,
statsLogger);
- this.memTable = new EntryMemTable(conf, checkpointSource,
statsLogger) {
- @Override
- boolean isSizeLimitReached() {
- return (injectMemTableSizeLimitReached.get() ||
super.isSizeLimitReached());
- }
- };
+ if (this.memTable instanceof EntryMemTableWithParallelFlusher) {
+ this.memTable = new EntryMemTableWithParallelFlusher(conf,
checkpointSource, statsLogger) {
+ @Override
+ boolean isSizeLimitReached() {
+ return (injectMemTableSizeLimitReached.get() ||
super.isSizeLimitReached());
+ }
+
+ @Override
+ long flushSnapshot(final SkipListFlusher flusher,
Checkpoint checkpoint) throws IOException {
+ incrementNumOfTimesFlushSnapshotCalled();
+ return super.flushSnapshot(flusher, checkpoint);
+ }
+ };
+ } else {
+ this.memTable = new EntryMemTable(conf, checkpointSource,
statsLogger) {
+ @Override
+ boolean isSizeLimitReached() {
+ return (injectMemTableSizeLimitReached.get() ||
super.isSizeLimitReached());
+ }
+
+ @Override
+ long flushSnapshot(final SkipListFlusher flusher,
Checkpoint checkpoint) throws IOException {
+ incrementNumOfTimesFlushSnapshotCalled();
+ return super.flushSnapshot(flusher, checkpoint);
+ }
+ };
+ }
}
@Override
public void process(long ledgerId, long entryId, ByteBuf buffer)
throws IOException {
- if (injectFlushException.get()) {
+ if (injectFlushException.get() &&
((injectFlushExceptionForLedger.get() == FORALLLEDGERS)
+ || (injectFlushExceptionForLedger.get() == ledgerId))) {
throw new IOException("Injected Exception");
}
super.process(ledgerId, entryId, buffer);
}
+
// simplified memTable full callback.
@Override
public void onSizeLimitReached(final CheckpointSource.Checkpoint cp)
throws IOException {
@@ -542,19 +581,18 @@ public class LedgerCacheTest {
// set flags, so that FlushTestSortedLedgerStorage simulates
FlushFailure scenario
flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
- flushTestSortedLedgerStorage.setInjectFlushException(true);
+ flushTestSortedLedgerStorage.setInjectFlushException(true,
FlushTestSortedLedgerStorage.FORALLLEDGERS);
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
- Thread.sleep(1000);
// since we simulated sizeLimitReached, snapshot shouldn't be empty
assertFalse("EntryMemTable SnapShot is not expected to be empty",
memTable.snapshot.isEmpty());
+ assertEquals("Flusher called", 1,
flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
// set the flags to false, so flush will succeed this time
flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(false);
- flushTestSortedLedgerStorage.setInjectFlushException(false);
+ flushTestSortedLedgerStorage.setInjectFlushException(false,
FlushTestSortedLedgerStorage.FORALLLEDGERS);
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
- Thread.sleep(1000);
// since we expect memtable flush to succeed, memtable snapshot should
be empty
assertTrue("EntryMemTable SnapShot is expected to be empty, because of
successful flush",
memTable.snapshot.isEmpty());
@@ -586,7 +624,7 @@ public class LedgerCacheTest {
// set flags, so that FlushTestSortedLedgerStorage simulates
FlushFailure scenario
flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
- flushTestSortedLedgerStorage.setInjectFlushException(true);
+ flushTestSortedLedgerStorage.setInjectFlushException(true,
FlushTestSortedLedgerStorage.FORALLLEDGERS);
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
// since we simulated sizeLimitReached, snapshot shouldn't be empty
@@ -613,4 +651,104 @@ public class LedgerCacheTest {
bb.writeBytes(data);
return bb;
}
+
+ @Test
+ public void testEntryMemTableParallelFlush() throws Exception {
+ int gcWaitTime = 1000;
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+
conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+ // enable entrylog per ledger
+ conf.setEntryLogPerLedgerEnabled(true);
+
+ Bookie bookie = new Bookie(conf);
+ FlushTestSortedLedgerStorage flushTestSortedLedgerStorage =
(FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+ EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+ /*
+ * this bookie.addEntry call is required. FileInfo for Ledger 1, 2, 3
+ * would be created with this call. without the fileinfo,
+ * 'flushTestSortedLedgerStorage.addEntry' calls will fail because of
+ * BOOKKEEPER-965 change.
+ */
+ bookie.addEntry(generateEntry(1, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+ bookie.addEntry(generateEntry(2, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+ bookie.addEntry(generateEntry(3, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+ flushTestSortedLedgerStorage.addEntry(generateEntry(2, 2));
+ flushTestSortedLedgerStorage.addEntry(generateEntry(3, 2));
+
+ assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
+ assertFalse("EntryMemTable is not expected to be empty",
memTable.isEmpty());
+
+ // inject MemTableSizeLimitReached, so entrymemtable will be flushed
+ flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
+
+ // since we simulated sizeLimitReached, snapshot should have been
created and flushed
+ assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
+ assertEquals("Flusher called", 1,
flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
+ }
+
+ @Test
+ public void testEntryMemTableParallelFlushWithFlushException() throws
Exception {
+ int gcWaitTime = 1000;
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+
conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+ // enable entrylog per ledger
+ conf.setEntryLogPerLedgerEnabled(true);
+
+ Bookie bookie = new Bookie(conf);
+ FlushTestSortedLedgerStorage flushTestSortedLedgerStorage =
(FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+ EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+ /*
+ * this bookie.addEntry call is required. FileInfo for Ledger 1, 2, 3
+ * would be created with this call. without the fileinfo,
+ * 'flushTestSortedLedgerStorage.addEntry' calls will fail because of
+ * BOOKKEEPER-965 change.
+ */
+ bookie.addEntry(generateEntry(1, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+ bookie.addEntry(generateEntry(2, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+ bookie.addEntry(generateEntry(3, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 4));
+ flushTestSortedLedgerStorage.addEntry(generateEntry(2, 4));
+ flushTestSortedLedgerStorage.addEntry(generateEntry(3, 4));
+
+ // inject MemTableSizeLimitReached and FlushException, so
entrymemtable flush will fail
+ flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+ flushTestSortedLedgerStorage.setInjectFlushException(true, 1L);
+
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 5));
+ // since we simulate FlushException, memtable snapshot should not be
empty
+ assertFalse("EntryMemTable SnapShot is not expected to be empty",
memTable.snapshot.isEmpty());
+ assertEquals("Flusher called", 1,
flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
+
+ flushTestSortedLedgerStorage.setInjectFlushException(false,
FlushTestSortedLedgerStorage.FORALLLEDGERS);
+ flushTestSortedLedgerStorage.addEntry(generateEntry(1, 5));
+ /*
+ * since MemTableSizeLimitReached is already set to true, and flush
+ * exception is disabled, this time memtable snapshot should be flushed
+ */
+ assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
+ assertEquals("Flusher called", 2,
flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
+ }
+
+ String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+ File ledgerDir;
+ File curDir;
+ String[] ledgerDirsPath = new String[numOfLedgerDirs];
+ for (int i = 0; i < numOfLedgerDirs; i++) {
+ ledgerDir = createTempDir("bkTest", ".dir");
+ curDir = Bookie.getCurrentDirectory(ledgerDir);
+ Bookie.checkDirectoryStructure(curDir);
+ ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+ }
+ return ledgerDirsPath;
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 50844f3..68e3eeb 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -27,25 +27,46 @@ import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
* Test the EntryMemTable class.
*/
+@RunWith(Parameterized.class)
public class TestEntryMemTable implements CacheCallback, SkipListFlusher,
CheckpointSource {
+ private Class entryMemTableClass;
private EntryMemTable memTable;
private final Random random = new Random();
private TestCheckPoint curCheckpoint = new TestCheckPoint(0, 0);
+ @Parameters
+ public static Collection<Object[]> memTableClass() {
+ return Arrays.asList(new Object[][] { { EntryMemTable.class }, {
EntryMemTableWithParallelFlusher.class } });
+ }
+
+ public TestEntryMemTable(Class entryMemTableClass) {
+ this.entryMemTableClass = entryMemTableClass;
+ }
+
@Override
public Checkpoint newCheckpoint() {
return curCheckpoint;
@@ -58,8 +79,18 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
@Before
public void setUp() throws Exception {
- this.memTable = new
EntryMemTable(TestBKConfiguration.newServerConfiguration(),
- this, NullStatsLogger.INSTANCE);
+ if (entryMemTableClass.equals(EntryMemTableWithParallelFlusher.class))
{
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ this.memTable = new EntryMemTableWithParallelFlusher(conf, this,
NullStatsLogger.INSTANCE);
+ } else {
+ this.memTable = new
EntryMemTable(TestBKConfiguration.newServerConfiguration(), this,
+ NullStatsLogger.INSTANCE);
+ }
+ }
+
+ @After
+ public void cleanup() throws Exception{
+ this.memTable.close();
}
@Test
@@ -134,9 +165,9 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
}
private class KVFLusher implements SkipListFlusher {
- final HashSet<EntryKeyValue> keyValues;
+ final Set<EntryKeyValue> keyValues;
- KVFLusher(final HashSet<EntryKeyValue> keyValues) {
+ KVFLusher(final Set<EntryKeyValue> keyValues) {
this.keyValues = keyValues;
}
@@ -160,7 +191,7 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
*/
@Test
public void testFlushLogMark() throws IOException {
- HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+ Set<EntryKeyValue> flushedKVs = Collections.newSetFromMap(new
ConcurrentHashMap<EntryKeyValue, Boolean>());
KVFLusher flusher = new KVFLusher(flushedKVs);
curCheckpoint.setCheckPoint(2, 2);
@@ -195,7 +226,7 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
@Test
public void testFlushSnapshot() throws IOException {
HashSet<EntryKeyValue> keyValues = new HashSet<EntryKeyValue>();
- HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+ Set<EntryKeyValue> flushedKVs = Collections.newSetFromMap(new
ConcurrentHashMap<EntryKeyValue, Boolean>());
KVFLusher flusher = new KVFLusher(flushedKVs);
byte[] data = new byte[10];
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 946e6af..ad73a01 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -420,6 +420,9 @@ ledgerDirectories=/tmp/bk-data
# number of active ledgers are written to a bookie.
# entryLogPerLedgerEnabled=false
+# In the case of multipleentrylogs, multiple threads can be used to flush the
memtable
+# numOfMemtableFlushThreads=8
+
#############################################################################
## Entry log compaction settings
#############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index fd3ab7a..481d828 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -409,6 +409,10 @@ groups:
description: |
How many index pages provided in ledger cache. If number of index pages
reaches this limitation, bookie server starts to swap some ledgers from memory
to disk. You can increment this value when you found swap became more frequent.
But make sure pageLimit*pageSize should not more than JVM max memory
limitation, otherwise you would got OutOfMemoryException. In general,
incrementing pageLimit, using smaller index page would gain bettern performance
in lager number of ledgers with fewe [...]
default: "-1"
+ - param: numOfMemtableFlushThreads
+ description: |
+ When entryLogPerLedger is enabled SortedLedgerStorage flushes entries
from memTable using OrderedExecutor having numOfMemtableFlushThreads number of
threads.
+ default: 8
- name: DB Ledger Storage Settings
params:
--
To stop receiving notification emails like this one, please contact
[email protected].