This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new f3d4a00 Additional error checks in TwoPhasesCompactor (#9133)
f3d4a00 is described below
commit f3d4a003c0ed1bdf1112ac6fb8bc0bf8254fcbe1
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jan 5 22:48:18 2021 -0800
Additional error checks in TwoPhasesCompactor (#9133)
If there is an exception in the `whenComplete()` block, the exception is
"eaten" up and the `loopPromise` is never set to failed.
Instead, it's better to use the `thenAccept().exceptionally()` to ensure
that any exception in the first handler will be propagated back.
(cherry picked from commit 984bf837489b7c6e5024d42a74fd729461baaf1e)
---
.../pulsar/compaction/TwoPhaseCompactor.java | 307 +++++++++++----------
1 file changed, 159 insertions(+), 148 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 78673a3..cd1aaee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -88,20 +88,21 @@ public class TwoPhaseCompactor extends Compactor {
Map<String,MessageId> latestForKey = new HashMap<>();
CompletableFuture<PhaseOneResult> loopPromise = new
CompletableFuture<>();
- reader.getLastMessageIdAsync().whenComplete(
- (lastMessageId, exception) -> {
- if (exception != null) {
- loopPromise.completeExceptionally(exception);
- } else {
- log.info("Commencing phase one of compaction for {},
reading to {}",
- reader.getTopic(), lastMessageId);
- // Each entry is processed as a whole, discard the
batchIndex part deliberately.
- MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
- MessageIdImpl lastEntryMessageId = new
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(),
lastImpl.getPartitionIndex());
- phaseOneLoop(reader, Optional.empty(),
Optional.empty(), lastEntryMessageId, latestForKey,
- loopPromise);
- }
+ reader.getLastMessageIdAsync()
+ .thenAccept(lastMessageId -> {
+ log.info("Commencing phase one of compaction for {},
reading to {}",
+ reader.getTopic(), lastMessageId);
+ // Each entry is processed as a whole, discard the
batchIndex part deliberately.
+ MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
+ MessageIdImpl lastEntryMessageId = new
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(),
+ lastImpl.getPartitionIndex());
+ phaseOneLoop(reader, Optional.empty(), Optional.empty(),
lastEntryMessageId, latestForKey,
+ loopPromise);
+ }).exceptionally(ex -> {
+ loopPromise.completeExceptionally(ex);
+ return null;
});
+
return loopPromise;
}
@@ -116,60 +117,59 @@ public class TwoPhaseCompactor extends Compactor {
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
scheduleTimeout(future);
- future.whenCompleteAsync(
- (m, exception) -> {
+
+ future.thenAcceptAsync(m -> {
+ try {
+ MessageId id = m.getMessageId();
+ boolean deletedMessage = false;
+ if (RawBatchConverter.isReadableBatch(m)) {
try {
- if (exception != null) {
- loopPromise.completeExceptionally(exception);
- return;
- }
- MessageId id = m.getMessageId();
- boolean deletedMessage = false;
- if (RawBatchConverter.isReadableBatch(m)) {
- try {
- for (ImmutableTriple<MessageId, String,
Integer> e :
-
RawBatchConverter.extractIdsAndKeysAndSize(m)) {
- if (e != null) {
- if (e.getRight() > 0) {
- latestForKey.put(e.getMiddle(),
e.getLeft());
- } else {
- deletedMessage = true;
- latestForKey.remove(e.getMiddle());
- }
- }
- }
- } catch (IOException ioe) {
- log.info("Error decoding batch for message {}.
Whole batch will be included in output",
- id, ioe);
- }
- } else {
- Pair<String,Integer> keyAndSize =
extractKeyAndSize(m);
- if (keyAndSize != null) {
- if(keyAndSize.getRight() > 0) {
- latestForKey.put(keyAndSize.getLeft(), id);
+ for (ImmutableTriple<MessageId, String, Integer> e :
RawBatchConverter
+ .extractIdsAndKeysAndSize(m)) {
+ if (e != null) {
+ if (e.getRight() > 0) {
+ latestForKey.put(e.getMiddle(),
e.getLeft());
} else {
deletedMessage = true;
- latestForKey.remove(keyAndSize.getLeft());
+ latestForKey.remove(e.getMiddle());
}
}
}
-
- MessageId first = firstMessageId.orElse(deletedMessage
? null : id);
- MessageId to = deletedMessage ?
toMessageId.orElse(null) : id;
- if (id.compareTo(lastMessageId) == 0) {
- loopPromise.complete(new PhaseOneResult(first ==
null ? id : first, to == null ? id : to,
- lastMessageId, latestForKey));
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}. Whole
batch will be included in output",
+ id, ioe);
+ }
+ } else {
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+ if (keyAndSize != null) {
+ if (keyAndSize.getRight() > 0) {
+ latestForKey.put(keyAndSize.getLeft(), id);
} else {
- phaseOneLoop(reader,
- Optional.ofNullable(first),
- Optional.ofNullable(to),
- lastMessageId,
- latestForKey, loopPromise);
+ deletedMessage = true;
+ latestForKey.remove(keyAndSize.getLeft());
}
- } finally {
- m.close();
}
- }, scheduler);
+ }
+
+ MessageId first = firstMessageId.orElse(deletedMessage ? null
: id);
+ MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+ if (id.compareTo(lastMessageId) == 0) {
+ loopPromise.complete(new PhaseOneResult(first == null ? id
: first, to == null ? id : to,
+ lastMessageId, latestForKey));
+ } else {
+ phaseOneLoop(reader,
+ Optional.ofNullable(first),
+ Optional.ofNullable(to),
+ lastMessageId,
+ latestForKey, loopPromise);
+ }
+ } finally {
+ m.close();
+ }
+ }, scheduler).exceptionally(ex -> {
+ loopPromise.completeExceptionally(ex);
+ return null;
+ });
}
private void scheduleTimeout(CompletableFuture<RawMessage> future) {
@@ -224,86 +224,85 @@ public class TwoPhaseCompactor extends Compactor {
if (promise.isDone()) {
return;
}
- reader.readNextAsync().whenCompleteAsync(
- (m, exception) -> {
- if (exception != null) {
- promise.completeExceptionally(exception);
- return;
- } else if (promise.isDone()) {
- m.close();
- return;
- }
+ reader.readNextAsync().thenAcceptAsync(m -> {
+ if (promise.isDone()) {
+ m.close();
+ return;
+ }
+ try {
+ MessageId id = m.getMessageId();
+ Optional<RawMessage> messageToAdd = Optional.empty();
+ if (RawBatchConverter.isReadableBatch(m)) {
try {
- MessageId id = m.getMessageId();
- Optional<RawMessage> messageToAdd = Optional.empty();
- if (RawBatchConverter.isReadableBatch(m)) {
- try {
- messageToAdd =
RawBatchConverter.rebatchMessage(
- m, (key, subid) ->
subid.equals(latestForKey.get(key)));
- } catch (IOException ioe) {
- log.info("Error decoding batch for message {}.
Whole batch will be included in output",
- id, ioe);
- messageToAdd = Optional.of(m);
- }
- } else {
- Pair<String,Integer> keyAndSize =
extractKeyAndSize(m);
- MessageId msg;
- if (keyAndSize == null) { // pass through messages
without a key
- messageToAdd = Optional.of(m);
- } else if ((msg =
latestForKey.get(keyAndSize.getLeft())) != null
- && msg.equals(id)) { // consider message
only if present into latestForKey map
- if (keyAndSize.getRight() <= 0) {
- promise.completeExceptionally(new
IllegalArgumentException(
- "Compaction phase found empty
record from sorted key-map"));
- }
- messageToAdd = Optional.of(m);
- }
+ messageToAdd = RawBatchConverter.rebatchMessage(
+ m, (key, subid) ->
subid.equals(latestForKey.get(key)));
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}. Whole
batch will be included in output",
+ id, ioe);
+ messageToAdd = Optional.of(m);
+ }
+ } else {
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+ MessageId msg;
+ if (keyAndSize == null) { // pass through messages without
a key
+ messageToAdd = Optional.of(m);
+ } else if ((msg = latestForKey.get(keyAndSize.getLeft()))
!= null
+ && msg.equals(id)) { // consider message only if
present into latestForKey map
+ if (keyAndSize.getRight() <= 0) {
+ promise.completeExceptionally(new
IllegalArgumentException(
+ "Compaction phase found empty record from
sorted key-map"));
}
+ messageToAdd = Optional.of(m);
+ }
+ }
- if (messageToAdd.isPresent()) {
- RawMessage message = messageToAdd.get();
- try {
- outstanding.acquire();
- CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, message)
- .whenComplete((res, exception2) -> {
- outstanding.release();
- if (exception2 != null) {
-
promise.completeExceptionally(exception2);
- }
- });
- if (to.equals(id)) {
- addFuture.whenComplete((res, exception2)
-> {
- if (exception2 == null) {
- promise.complete(null);
- }
- });
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(ie);
- } finally {
- if (message != m) {
- message.close();
+ if (messageToAdd.isPresent()) {
+ RawMessage message = messageToAdd.get();
+ try {
+ outstanding.acquire();
+ CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, message)
+ .whenComplete((res, exception2) -> {
+ outstanding.release();
+ if (exception2 != null) {
+
promise.completeExceptionally(exception2);
+ }
+ });
+ if (to.equals(id)) {
+ addFuture.whenComplete((res, exception2) -> {
+ if (exception2 == null) {
+ promise.complete(null);
}
- }
- } else if (to.equals(id)) {
- // Reached to last-id and phase-one found it
deleted-message while iterating on ledger so,
- // not present under latestForKey. Complete the
compaction.
- try {
- // make sure all inflight writes have finished
- outstanding.acquire(MAX_OUTSTANDING);
- promise.complete(null);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(e);
- }
- return;
+ });
}
- phaseTwoLoop(reader, to, latestForKey, lh,
outstanding, promise);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(ie);
} finally {
- m.close();
+ if (message != m) {
+ message.close();
+ }
+ }
+ } else if (to.equals(id)) {
+ // Reached to last-id and phase-one found it
deleted-message while iterating on ledger so,
+ // not present under latestForKey. Complete the compaction.
+ try {
+ // make sure all inflight writes have finished
+ outstanding.acquire(MAX_OUTSTANDING);
+ promise.complete(null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(e);
}
- }, scheduler);
+ return;
+ }
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise);
+ } finally {
+ m.close();
+ }
+ }, scheduler).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
}
private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk,
Map<String,byte[]> metadata) {
@@ -331,40 +330,52 @@ public class TwoPhaseCompactor extends Compactor {
private CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle
lh) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
- bk.asyncDeleteLedger(lh.getId(),
- (rc, ctx) -> {
- if (rc != BKException.Code.OK) {
-
bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(null);
- }
- }, null);
+ try {
+ bk.asyncDeleteLedger(lh.getId(),
+ (rc, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(null);
+ }
+ }, null);
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
return bkf;
}
private CompletableFuture<Void> closeLedger(LedgerHandle lh) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
- lh.asyncClose((rc, ledger, ctx) -> {
+ try {
+ lh.asyncClose((rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(null);
}
}, null);
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
return bkf;
}
private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh,
RawMessage m) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
ByteBuf serialized = m.serialize();
- lh.asyncAddEntry(serialized,
- (rc, ledger, eid, ctx) -> {
- if (rc != BKException.Code.OK) {
-
bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(null);
- }
- }, null);
+ try {
+ lh.asyncAddEntry(serialized,
+ (rc, ledger, eid, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(null);
+ }
+ }, null);
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
return bkf;
}