This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f3d4a00  Additional error checks in TwoPhasesCompactor (#9133)
f3d4a00 is described below

commit f3d4a003c0ed1bdf1112ac6fb8bc0bf8254fcbe1
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jan 5 22:48:18 2021 -0800

    Additional error checks in TwoPhasesCompactor (#9133)
    
    If there is an exception in the `whenComplete()` block, the exception is 
"eaten" up and the `loopPromise` is never set to failed.
    
    Instead, it's better to use the `thenAccept().exceptionally()` to ensure 
that any exception in the first handler will be propagated back.
    
    (cherry picked from commit 984bf837489b7c6e5024d42a74fd729461baaf1e)
---
 .../pulsar/compaction/TwoPhaseCompactor.java       | 307 +++++++++++----------
 1 file changed, 159 insertions(+), 148 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 78673a3..cd1aaee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -88,20 +88,21 @@ public class TwoPhaseCompactor extends Compactor {
         Map<String,MessageId> latestForKey = new HashMap<>();
         CompletableFuture<PhaseOneResult> loopPromise = new 
CompletableFuture<>();
 
-        reader.getLastMessageIdAsync().whenComplete(
-                (lastMessageId, exception) -> {
-                    if (exception != null) {
-                        loopPromise.completeExceptionally(exception);
-                    } else {
-                        log.info("Commencing phase one of compaction for {}, 
reading to {}",
-                                 reader.getTopic(), lastMessageId);
-                        // Each entry is processed as a whole, discard the 
batchIndex part deliberately.
-                        MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
-                        MessageIdImpl lastEntryMessageId = new 
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), 
lastImpl.getPartitionIndex());
-                        phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastEntryMessageId, latestForKey,
-                                loopPromise);
-                    }
+        reader.getLastMessageIdAsync()
+                .thenAccept(lastMessageId -> {
+                    log.info("Commencing phase one of compaction for {}, 
reading to {}",
+                            reader.getTopic(), lastMessageId);
+                    // Each entry is processed as a whole, discard the 
batchIndex part deliberately.
+                    MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
+                    MessageIdImpl lastEntryMessageId = new 
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(),
+                            lastImpl.getPartitionIndex());
+                    phaseOneLoop(reader, Optional.empty(), Optional.empty(), 
lastEntryMessageId, latestForKey,
+                            loopPromise);
+                }).exceptionally(ex -> {
+                    loopPromise.completeExceptionally(ex);
+                    return null;
                 });
+
         return loopPromise;
     }
 
@@ -116,60 +117,59 @@ public class TwoPhaseCompactor extends Compactor {
         }
         CompletableFuture<RawMessage> future = reader.readNextAsync();
         scheduleTimeout(future);
-        future.whenCompleteAsync(
-                (m, exception) -> {
+
+        future.thenAcceptAsync(m -> {
+            try {
+                MessageId id = m.getMessageId();
+                boolean deletedMessage = false;
+                if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        if (exception != null) {
-                            loopPromise.completeExceptionally(exception);
-                            return;
-                        }
-                        MessageId id = m.getMessageId();
-                        boolean deletedMessage = false;
-                        if (RawBatchConverter.isReadableBatch(m)) {
-                            try {
-                                for (ImmutableTriple<MessageId, String, 
Integer> e :
-                                        
RawBatchConverter.extractIdsAndKeysAndSize(m)) {
-                                    if (e != null) {
-                                        if (e.getRight() > 0) {
-                                            latestForKey.put(e.getMiddle(), 
e.getLeft());
-                                        } else {
-                                            deletedMessage = true;
-                                            latestForKey.remove(e.getMiddle());
-                                        }
-                                    }
-                                }
-                            } catch (IOException ioe) {
-                                log.info("Error decoding batch for message {}. 
Whole batch will be included in output",
-                                         id, ioe);
-                            }
-                        } else {
-                            Pair<String,Integer> keyAndSize = 
extractKeyAndSize(m);
-                            if (keyAndSize != null) {
-                                if(keyAndSize.getRight() > 0) {
-                                    latestForKey.put(keyAndSize.getLeft(), id);
+                        for (ImmutableTriple<MessageId, String, Integer> e : 
RawBatchConverter
+                                .extractIdsAndKeysAndSize(m)) {
+                            if (e != null) {
+                                if (e.getRight() > 0) {
+                                    latestForKey.put(e.getMiddle(), 
e.getLeft());
                                 } else {
                                     deletedMessage = true;
-                                    latestForKey.remove(keyAndSize.getLeft());
+                                    latestForKey.remove(e.getMiddle());
                                 }
                             }
                         }
-
-                        MessageId first = firstMessageId.orElse(deletedMessage 
? null : id);
-                        MessageId to = deletedMessage ? 
toMessageId.orElse(null) : id;
-                        if (id.compareTo(lastMessageId) == 0) {
-                            loopPromise.complete(new PhaseOneResult(first == 
null ? id : first, to == null ? id : to,
-                                    lastMessageId, latestForKey));
+                    } catch (IOException ioe) {
+                        log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
+                                id, ioe);
+                    }
+                } else {
+                    Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+                    if (keyAndSize != null) {
+                        if (keyAndSize.getRight() > 0) {
+                            latestForKey.put(keyAndSize.getLeft(), id);
                         } else {
-                            phaseOneLoop(reader,
-                                         Optional.ofNullable(first),
-                                         Optional.ofNullable(to),
-                                         lastMessageId,
-                                         latestForKey, loopPromise);
+                            deletedMessage = true;
+                            latestForKey.remove(keyAndSize.getLeft());
                         }
-                    } finally {
-                        m.close();
                     }
-                }, scheduler);
+                }
+
+                MessageId first = firstMessageId.orElse(deletedMessage ? null 
: id);
+                MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+                if (id.compareTo(lastMessageId) == 0) {
+                    loopPromise.complete(new PhaseOneResult(first == null ? id 
: first, to == null ? id : to,
+                            lastMessageId, latestForKey));
+                } else {
+                    phaseOneLoop(reader,
+                            Optional.ofNullable(first),
+                            Optional.ofNullable(to),
+                            lastMessageId,
+                            latestForKey, loopPromise);
+                }
+            } finally {
+                m.close();
+            }
+        }, scheduler).exceptionally(ex -> {
+            loopPromise.completeExceptionally(ex);
+            return null;
+        });
     }
 
     private void scheduleTimeout(CompletableFuture<RawMessage> future) {
@@ -224,86 +224,85 @@ public class TwoPhaseCompactor extends Compactor {
         if (promise.isDone()) {
             return;
         }
-        reader.readNextAsync().whenCompleteAsync(
-                (m, exception) -> {
-                    if (exception != null) {
-                        promise.completeExceptionally(exception);
-                        return;
-                    } else if (promise.isDone()) {
-                        m.close();
-                        return;
-                    }
+        reader.readNextAsync().thenAcceptAsync(m -> {
+            if (promise.isDone()) {
+                m.close();
+                return;
+            }
+            try {
+                MessageId id = m.getMessageId();
+                Optional<RawMessage> messageToAdd = Optional.empty();
+                if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        MessageId id = m.getMessageId();
-                        Optional<RawMessage> messageToAdd = Optional.empty();
-                        if (RawBatchConverter.isReadableBatch(m)) {
-                            try {
-                                messageToAdd = 
RawBatchConverter.rebatchMessage(
-                                        m, (key, subid) -> 
subid.equals(latestForKey.get(key)));
-                            } catch (IOException ioe) {
-                                log.info("Error decoding batch for message {}. 
Whole batch will be included in output",
-                                        id, ioe);
-                                messageToAdd = Optional.of(m);
-                            }
-                        } else {
-                            Pair<String,Integer> keyAndSize = 
extractKeyAndSize(m);
-                            MessageId msg;
-                            if (keyAndSize == null) { // pass through messages 
without a key
-                                messageToAdd = Optional.of(m);
-                            } else if ((msg = 
latestForKey.get(keyAndSize.getLeft())) != null
-                                    && msg.equals(id)) { // consider message 
only if present into latestForKey map
-                                if (keyAndSize.getRight() <= 0) {
-                                    promise.completeExceptionally(new 
IllegalArgumentException(
-                                            "Compaction phase found empty 
record from sorted key-map"));
-                                }
-                                messageToAdd = Optional.of(m);
-                            }
+                        messageToAdd = RawBatchConverter.rebatchMessage(
+                                m, (key, subid) -> 
subid.equals(latestForKey.get(key)));
+                    } catch (IOException ioe) {
+                        log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
+                                id, ioe);
+                        messageToAdd = Optional.of(m);
+                    }
+                } else {
+                    Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+                    MessageId msg;
+                    if (keyAndSize == null) { // pass through messages without 
a key
+                        messageToAdd = Optional.of(m);
+                    } else if ((msg = latestForKey.get(keyAndSize.getLeft())) 
!= null
+                            && msg.equals(id)) { // consider message only if 
present into latestForKey map
+                        if (keyAndSize.getRight() <= 0) {
+                            promise.completeExceptionally(new 
IllegalArgumentException(
+                                    "Compaction phase found empty record from 
sorted key-map"));
                         }
+                        messageToAdd = Optional.of(m);
+                    }
+                }
 
-                        if (messageToAdd.isPresent()) {
-                            RawMessage message = messageToAdd.get();
-                            try {
-                                outstanding.acquire();
-                                CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, message)
-                                        .whenComplete((res, exception2) -> {
-                                            outstanding.release();
-                                            if (exception2 != null) {
-                                                
promise.completeExceptionally(exception2);
-                                            }
-                                        });
-                                if (to.equals(id)) {
-                                    addFuture.whenComplete((res, exception2) 
-> {
-                                        if (exception2 == null) {
-                                            promise.complete(null);
-                                        }
-                                    });
-                                }
-                            } catch (InterruptedException ie) {
-                                Thread.currentThread().interrupt();
-                                promise.completeExceptionally(ie);
-                            } finally {
-                                if (message != m) {
-                                    message.close();
+                if (messageToAdd.isPresent()) {
+                    RawMessage message = messageToAdd.get();
+                    try {
+                        outstanding.acquire();
+                        CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, message)
+                                .whenComplete((res, exception2) -> {
+                                    outstanding.release();
+                                    if (exception2 != null) {
+                                        
promise.completeExceptionally(exception2);
+                                    }
+                                });
+                        if (to.equals(id)) {
+                            addFuture.whenComplete((res, exception2) -> {
+                                if (exception2 == null) {
+                                    promise.complete(null);
                                 }
-                            }
-                        } else if (to.equals(id)) {
-                            // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so,
-                            // not present under latestForKey. Complete the 
compaction.
-                            try {
-                                // make sure all inflight writes have finished
-                                outstanding.acquire(MAX_OUTSTANDING);
-                                promise.complete(null);
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                promise.completeExceptionally(e);
-                            }
-                            return;
+                            });
                         }
-                        phaseTwoLoop(reader, to, latestForKey, lh, 
outstanding, promise);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        promise.completeExceptionally(ie);
                     } finally {
-                        m.close();
+                        if (message != m) {
+                            message.close();
+                        }
+                    }
+                } else if (to.equals(id)) {
+                    // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so,
+                    // not present under latestForKey. Complete the compaction.
+                    try {
+                        // make sure all inflight writes have finished
+                        outstanding.acquire(MAX_OUTSTANDING);
+                        promise.complete(null);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        promise.completeExceptionally(e);
                     }
-                }, scheduler);
+                    return;
+                }
+                phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise);
+            } finally {
+                m.close();
+            }
+        }, scheduler).exceptionally(ex -> {
+            promise.completeExceptionally(ex);
+            return null;
+        });
     }
 
     private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, 
Map<String,byte[]> metadata) {
@@ -331,40 +330,52 @@ public class TwoPhaseCompactor extends Compactor {
 
     private CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle 
lh) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
-        bk.asyncDeleteLedger(lh.getId(),
-                             (rc, ctx) -> {
-                                 if (rc != BKException.Code.OK) {
-                                     
bkf.completeExceptionally(BKException.create(rc));
-                                 } else {
-                                     bkf.complete(null);
-                                 }
-                             }, null);
+        try {
+            bk.asyncDeleteLedger(lh.getId(),
+                    (rc, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            bkf.completeExceptionally(BKException.create(rc));
+                        } else {
+                            bkf.complete(null);
+                        }
+                    }, null);
+        } catch (Throwable t) {
+            return FutureUtil.failedFuture(t);
+        }
         return bkf;
     }
 
     private CompletableFuture<Void> closeLedger(LedgerHandle lh) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
-        lh.asyncClose((rc, ledger, ctx) -> {
+        try {
+            lh.asyncClose((rc, ledger, ctx) -> {
                 if (rc != BKException.Code.OK) {
                     bkf.completeExceptionally(BKException.create(rc));
                 } else {
                     bkf.complete(null);
                 }
             }, null);
+        } catch (Throwable t) {
+            return FutureUtil.failedFuture(t);
+        }
         return bkf;
     }
 
     private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, 
RawMessage m) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
         ByteBuf serialized = m.serialize();
-        lh.asyncAddEntry(serialized,
-                         (rc, ledger, eid, ctx) -> {
-                             if (rc != BKException.Code.OK) {
-                                 
bkf.completeExceptionally(BKException.create(rc));
-                             } else {
-                                 bkf.complete(null);
-                             }
-                         }, null);
+        try {
+            lh.asyncAddEntry(serialized,
+                    (rc, ledger, eid, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            bkf.completeExceptionally(BKException.create(rc));
+                        } else {
+                            bkf.complete(null);
+                        }
+                    }, null);
+        } catch (Throwable t) {
+            return FutureUtil.failedFuture(t);
+        }
         return bkf;
     }
 

Reply via email to