Technoboy- commented on a change in pull request #13876:
URL: https://github.com/apache/pulsar/pull/13876#discussion_r789396438
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2383,42 +2383,55 @@ private PositionImpl calculatePositionAckSet(boolean
isExcluded, int batchSize,
protected void internalGetMessageById(AsyncResponse asyncResponse, long
ledgerId, long entryId,
boolean authoritative) {
- try {
- // will redirect if the topic not owned by current broker
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl)
topic.getManagedLedger();
- ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new
AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
- asyncResponse.resume(new RestException(exception));
- }
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES))
+ .thenCompose(__ -> {
+ CompletableFuture<Void> ret;
+ if (topicName.isGlobal()) {
+ ret =
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ ret = CompletableFuture.completedFuture(null);
+ }
+ return ret.thenCompose(ignore -> {
+ return getTopicReferenceAsync(topicName)
+ .thenAccept(topic -> {
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl)
((PersistentTopic) topic).getManagedLedger();
+ ledger.asyncReadEntry(new
PositionImpl(ledgerId, entryId),
+ new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void
readEntryFailed(ManagedLedgerException exception,
+
Object ctx) {
+ asyncResponse.resume(new
RestException(exception));
+ }
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- try {
- asyncResponse.resume(generateResponseWithEntry(entry));
- } catch (IOException exception) {
- asyncResponse.resume(new RestException(exception));
- } finally {
- if (entry != null) {
- entry.release();
- }
+ @Override
+ public void
readEntryComplete(Entry entry, Object ctx) {
+ try {
+
asyncResponse.resume(generateResponseWithEntry(entry));
+ } catch (IOException
exception) {
+
asyncResponse.resume(new RestException(exception));
+ } finally {
+ if (entry != null) {
+ entry.release();
+ }
+ }
+ }
+ }, null);
+ });
+ });
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof NullPointerException) {
Review comment:
Yes, maybe we will fix this in another new pr. If fixed here, will be
confused.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]