nodece commented on code in PR #24222: URL: https://github.com/apache/pulsar/pull/24222#discussion_r2125901799
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java: ########## @@ -5489,4 +5489,129 @@ protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub return null; })); } + + protected CompletableFuture<MessageId> internalGetMessageIDByIndexAsync(Long index, boolean authoritative) { + if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "GetMessageIDByIndex is not allowed when broker entry metadata is disabled")); + } + if (index == null || index < 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "Invalid message index: " + index)); + } + int partitionIndex = topicName.getPartitionIndex(); + CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); + return future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { + if (topicName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } else { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(topicMetadata -> { + if (topicMetadata.partitions > 0) { + log.warn("[{}] Not supported getMessageIdByIndex operation on " + + "partitioned-topic {}", clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "GetMessageIDByIndex is not allowed on partitioned-topic"); + } + }); + } + }).thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + if (!(topic instanceof PersistentTopic persistentTopic)) { + log.error("[{}] Get message id by index on a non-persistent topic {} is not allowed", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED, + "Get message id by index on a non-persistent topic is not allowed")); + } + ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + return findMessageIndexByPosition( + PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0), + managedLedger) + .thenCompose(firstIndex -> { + if (index < firstIndex) { + return CompletableFuture.completedFuture(PositionFactory.EARLIEST); + } else { + return managedLedger.asyncFindPosition(entry -> { + try { + Long messageIndex = getIndexAndRelease(entry); + if (messageIndex == null) { + return false; // Skip messages without index + } else { + // If the message index is less than the requested index, + // we continue searching + return messageIndex < index; + } + } catch (IOException e) { + log.error("Error deserialize message for message position find", e); + return false; + } + }); + } + }).thenCompose(position -> { + Position lastPosition = managedLedger.getLastConfirmedEntry(); + if (position == null || position.compareTo(lastPosition) > 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, Review Comment: Could you add a test to cover this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org