liangyepianzhou commented on code in PR #24222: URL: https://github.com/apache/pulsar/pull/24222#discussion_r2125990993
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java: ########## @@ -5489,4 +5489,114 @@ protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub return null; })); } + + protected CompletableFuture<MessageId> internalGetMessageIDByIndexAsync(Long index, boolean authoritative) { + if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "GetMessageIDByIndex is not allowed when broker entry metadata is disabled")); + } + if (index == null || index < 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "Invalid message index: " + index)); + } + int partitionIndex = topicName.getPartitionIndex(); + CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); + return future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { + if (topicName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } else { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(topicMetadata -> { + if (topicMetadata.partitions > 0) { + log.warn("[{}] Not supported getMessageIdByIndex operation on " + + "partitioned-topic {}", clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "GetMessageIDByIndex is not allowed on partitioned-topic"); + } + }); + } + }).thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + if (!(topic instanceof PersistentTopic persistentTopic)) { + log.error("[{}] Get message id by index on a non-persistent topic {} is not allowed", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED, + "Get message id by index on a non-persistent topic is not allowed")); + } + ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + return findMessageIndexByPosition( + PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0), Review Comment: >findMessageIndexByPosition(managedLedger.getFirstPosition(), ...) is better. @nodece `managedLedger.getFirstPosition()` will return [first ledger Id, -1] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org