sijie commented on a change in pull request #1201: ISSUE #570: Entrylog per 
ledger
URL: https://github.com/apache/bookkeeper/pull/1201#discussion_r170556439
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
 ##########
 @@ -263,6 +285,81 @@ private long flushSnapshot(final SkipListFlusher flusher, 
Checkpoint checkpoint)
         return size;
     }
 
+    long flushSnapshotParallelly(final SkipListFlusher flusher, Checkpoint 
checkpoint) throws IOException {
+        AtomicLong flushedSize = new AtomicLong();
+        if (this.snapshot.compareTo(checkpoint) < 0) {
+            synchronized (this) {
+                EntrySkipList keyValues = this.snapshot;
+                if (keyValues.compareTo(checkpoint) < 0) {
+                    NavigableSet<EntryKey> keyValuesSet = keyValues.keySet();
+                    Map<Long, List<EntryKeyValue>> entryKeyValuesMap = new 
HashMap<Long, List<EntryKeyValue>>();
+
+                    for (EntryKey key : keyValuesSet) {
+                        EntryKeyValue kv = (EntryKeyValue) key;
+                        Long ledger = kv.getLedgerId();
+                        if (!entryKeyValuesMap.containsKey(ledger)) {
+                            entryKeyValuesMap.put(ledger, new 
LinkedList<EntryKeyValue>());
+                        }
+                        entryKeyValuesMap.get(ledger).add(kv);
+                    }
+
+                    CountDownLatch latch = new 
CountDownLatch(entryKeyValuesMap.size());
+                    AtomicBoolean isFlushThreadInterrupted = new 
AtomicBoolean(false);
+                    AtomicReference<Exception> 
exceptionWhileFlushingParallelly =  new AtomicReference<Exception>();
+                    Thread mainFlushThread = Thread.currentThread();
+
+                    for (Long ledgerId : entryKeyValuesMap.keySet()) {
+                        List<EntryKeyValue> entryKeyValuesOfALedger = 
entryKeyValuesMap.get(ledgerId);
+                        flushExecutor.submitOrdered(ledgerId, new 
SafeRunnable() {
+                            @Override
+                            public void safeRun() {
+                                for (EntryKeyValue entryKeyValue : 
entryKeyValuesOfALedger) {
+                                    try {
+                                        flusher.process(ledgerId, 
entryKeyValue.getEntryId(),
+                                                
entryKeyValue.getValueAsByteBuffer());
+                                        
flushedSize.addAndGet(entryKeyValue.getLength());
+                                    } catch (NoLedgerException exception) {
+                                        logger.info("Got NoLedgerException 
while flushing entry: {}. The ledger "
+                                                + "must be deleted " + "after 
this entry is added to the Memtable",
+                                                entryKeyValue);
+                                        break;
+                                    } catch (Exception exc) {
+                                        logger.error(
+                                                "Got Exception while trying to 
flush process entry: " + entryKeyValue,
+                                                exc);
+                                        if 
(isFlushThreadInterrupted.compareAndSet(false, true)) {
+                                            
exceptionWhileFlushingParallelly.set(exc);
+                                            mainFlushThread.interrupt();
+                                        }
+                                        // return without countdowning the 
latch since we got unexpected Exception
+                                        return;
+                                    }
+                                }
+                                latch.countDown();
+                            }
+                        });
+                    }
+
+                    try {
+                        while (!latch.await(memtableFlushTimeoutInSeconds, 
TimeUnit.SECONDS)) {
 
 Review comment:
   I would suggest removing 'timeout' at all here, using `await()` instead. 
because if you are using `await`, it provides a nature backpressure mechanism, 
because if a snapshot can't not be flushed, the throttling logic in memtable 
will kick in and it would then slow down bookie by slowing down the journal 
thread, so the backpressure signal can be passed back to clients.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to