reddycharan commented on a change in pull request #1201: ISSUE #570: Entrylog
per ledger
URL: https://github.com/apache/bookkeeper/pull/1201#discussion_r171048267
##########
File path:
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
##########
@@ -263,6 +285,81 @@ private long flushSnapshot(final SkipListFlusher flusher,
Checkpoint checkpoint)
return size;
}
+ long flushSnapshotParallelly(final SkipListFlusher flusher, Checkpoint
checkpoint) throws IOException {
+ AtomicLong flushedSize = new AtomicLong();
+ if (this.snapshot.compareTo(checkpoint) < 0) {
+ synchronized (this) {
+ EntrySkipList keyValues = this.snapshot;
+ if (keyValues.compareTo(checkpoint) < 0) {
+ NavigableSet<EntryKey> keyValuesSet = keyValues.keySet();
+ Map<Long, List<EntryKeyValue>> entryKeyValuesMap = new
HashMap<Long, List<EntryKeyValue>>();
+
+ for (EntryKey key : keyValuesSet) {
+ EntryKeyValue kv = (EntryKeyValue) key;
+ Long ledger = kv.getLedgerId();
+ if (!entryKeyValuesMap.containsKey(ledger)) {
+ entryKeyValuesMap.put(ledger, new
LinkedList<EntryKeyValue>());
+ }
+ entryKeyValuesMap.get(ledger).add(kv);
+ }
+
+ CountDownLatch latch = new
CountDownLatch(entryKeyValuesMap.size());
+ AtomicBoolean isFlushThreadInterrupted = new
AtomicBoolean(false);
+ AtomicReference<Exception>
exceptionWhileFlushingParallelly = new AtomicReference<Exception>();
+ Thread mainFlushThread = Thread.currentThread();
+
+ for (Long ledgerId : entryKeyValuesMap.keySet()) {
+ List<EntryKeyValue> entryKeyValuesOfALedger =
entryKeyValuesMap.get(ledgerId);
+ flushExecutor.submitOrdered(ledgerId, new
SafeRunnable() {
+ @Override
+ public void safeRun() {
+ for (EntryKeyValue entryKeyValue :
entryKeyValuesOfALedger) {
+ try {
+ flusher.process(ledgerId,
entryKeyValue.getEntryId(),
+
entryKeyValue.getValueAsByteBuffer());
+
flushedSize.addAndGet(entryKeyValue.getLength());
+ } catch (NoLedgerException exception) {
+ logger.info("Got NoLedgerException
while flushing entry: {}. The ledger "
+ + "must be deleted " + "after
this entry is added to the Memtable",
+ entryKeyValue);
+ break;
+ } catch (Exception exc) {
+ logger.error(
+ "Got Exception while trying to
flush process entry: " + entryKeyValue,
+ exc);
+ if
(isFlushThreadInterrupted.compareAndSet(false, true)) {
+
exceptionWhileFlushingParallelly.set(exc);
+ mainFlushThread.interrupt();
+ }
+ // return without countdowning the
latch since we got unexpected Exception
+ return;
+ }
+ }
+ latch.countDown();
+ }
+ });
+ }
+
+ try {
+ while (!latch.await(memtableFlushTimeoutInSeconds,
TimeUnit.SECONDS)) {
Review comment:
actually, here I'm not doing much with await logic. Initially i wanted the
timeout logic, but based on our internal feedback i toned down the
functionality of it and just used it for logging purpose. It is just used for
logging purpose, saying that flush didn't complete in
"memtableFlushTimeoutInSeconds" period and hence waiting again..
Anyway, if you want me to remove the config all together, I can remove it
and use some static value variable (120 secs) here. But I feel it would be
helpful to log this line if for any reason it didn't complete the flush in
memtableFlushTimeoutInSeconds period before going back to wait logic.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services