codelipenghui commented on a change in pull request #14062:
URL: https://github.com/apache/pulsar/pull/14062#discussion_r801667655
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2540,58 +2540,81 @@ protected Response internalPeekNthMessage(String
subName, int messagePosition, b
}
}
- protected Response internalExamineMessage(String initialPosition, long
messagePosition, boolean authoritative) {
+ protected void internalExamineMessageAsync(AsyncResponse asyncResponse,
String initialPosition,
+ long messagePosition, boolean
authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- if (!topicName.isPartitioned() &&
getPartitionedTopicMetadata(topicName,
- authoritative, false).partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Examine messages on a partitioned topic is not allowed, "
- + "please try examine message on specific topic
partition");
- }
- validateTopicOwnership(topicName, authoritative);
- if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
- log.error("[{}] Not supported operation of non-persistent topic {}
", clientAppId(), topicName);
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Examine messages on a non-persistent topic is not
allowed");
- }
+ long msgPos = messagePosition < 1 ? 1 : messagePosition;
+ String initPos = initialPosition == null ? "latest" : initialPosition;
- if (messagePosition < 1) {
- messagePosition = 1;
- }
- if (null == initialPosition) {
- initialPosition = "latest";
+ if (!topicName.isPartitioned()) {
+ future = future.thenCompose(__ ->
getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false))
+ .thenAccept(partitionedTopicMetadata -> {
+ if (partitionedTopicMetadata.partitions > 0) {
+ String msg = "Examine messages on a partitioned
topic is not allowed, "
+ + "please try examine message on specific
topic partition";
+ log.warn("[{}] {} topicName: {}", clientAppId(),
msg, topicName);
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
msg);
+ }
+ });
}
- try {
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- long totalMessage = topic.getNumberOfEntries();
- PositionImpl startPosition = topic.getFirstPosition();
- long messageToSkip =
- initialPosition.equals("earliest") ? messagePosition :
totalMessage - messagePosition + 1;
- CompletableFuture<Entry> future = new CompletableFuture<>();
- PositionImpl readPosition = topic.getPositionAfterN(startPosition,
messageToSkip);
- topic.asyncReadEntry(readPosition, new
AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- future.complete(entry);
- }
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic)) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Examine messages on a non-persistent topic is
not allowed");
+ }
- @Override
- public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
- future.completeExceptionally(exception);
- }
- }, null);
- return generateResponseWithEntry(future.get());
- } catch (Exception exception) {
- exception.printStackTrace();
- log.error("[{}] Failed to examine message at position {} from {}
due to {}", clientAppId(), messagePosition,
- topicName, exception);
- throw new RestException(exception);
- }
+ try {
+ PersistentTopic persistentTopic = (PersistentTopic)
topic;
+ long totalMessage =
persistentTopic.getNumberOfEntries();
+ PositionImpl startPosition =
persistentTopic.getFirstPosition();
+ long messageToSkip =
+ initPos.equals("earliest") ? msgPos :
totalMessage - msgPos + 1;
+ CompletableFuture<Entry> readEntry = new
CompletableFuture<>();
+ PositionImpl readPosition =
persistentTopic.getPositionAfterN(startPosition, messageToSkip);
+ persistentTopic.asyncReadEntry(readPosition, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object
ctx) {
+ readEntry.complete(entry);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException
exception, Object ctx) {
+ readEntry.completeExceptionally(exception);
+ }
+ }, null);
+ return readEntry;
+ } catch (Exception exception) {
Review comment:
```suggestion
} catch (ManagedLedgerException exception) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]