liangyepianzhou commented on code in PR #24222: URL: https://github.com/apache/pulsar/pull/24222#discussion_r2126017133
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java: ########## @@ -5489,4 +5489,131 @@ protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub return null; })); } + + protected CompletableFuture<MessageId> internalGetMessageIDByIndexAsync(Long index, boolean authoritative) { + if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "GetMessageIDByIndex is not allowed when broker entry metadata is disabled")); + } + if (index == null || index < 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "Invalid message index: " + index)); + } + int partitionIndex = topicName.getPartitionIndex(); + CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); + return future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { + if (topicName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } else { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(topicMetadata -> { + if (topicMetadata.partitions > 0) { + log.warn("[{}] Not supported getMessageIdByIndex operation on " + + "partitioned-topic {}", clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "GetMessageIDByIndex is not allowed on partitioned-topic"); + } + }); + } + }).thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + if (!(topic instanceof PersistentTopic persistentTopic)) { + log.error("[{}] Get message id by index on a non-persistent topic {} is not allowed", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED, + "Get message id by index on a non-persistent topic is not allowed")); + } + ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + return findMessageIndexByPosition( + PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0), + managedLedger) + .thenCompose(firstIndex -> { + if (index < firstIndex) { + return CompletableFuture.completedFuture(PositionFactory.EARLIEST); + } else { + return managedLedger.asyncFindPosition(entry -> { + try { + Long messageIndex = getIndexFromEntry(entry); + if (messageIndex == null) { + return false; // Skip messages without index + } else { + // If the message index is less than the requested index, + // we continue searching + return messageIndex < index; + } + } catch (IOException e) { + log.error("Error deserialize message for message position find", e); + return false; + } finally { + entry.release(); + } + }); + } + }).thenCompose(position -> { + Position lastPosition = managedLedger.getLastConfirmedEntry(); + if (position == null || position.compareTo(lastPosition) > 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "Message not found for index " + index)); + } else { + return CompletableFuture.completedFuture(position); + } + }); + }).thenCompose(position -> CompletableFuture.completedFuture( + new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex))); + } + + protected CompletableFuture<Long> findMessageIndexByPosition(Position position, ManagedLedger managedLedger) { + CompletableFuture<Long> indexFuture = new CompletableFuture<>(); + managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + Long index = getIndexFromEntry(entry); + if (index == null) { + indexFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, + "Broker entry metadata is not present in the message")); + } else if (index < 0) { + indexFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, + "Invalid message index: " + index)); + } else { + indexFuture.complete(index); + } + } catch (IOException e) { + indexFuture.completeExceptionally(new RestException(Status.INTERNAL_SERVER_ERROR, + "Failed to get index from entry: " + e.getMessage())); + } finally { + entry.release(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}] Failed to read position {} on topic {}", + clientAppId(), position, topicName, exception); + indexFuture.completeExceptionally(exception); + } + }, null); + return indexFuture; + } + + + private static Long getIndexFromEntry(Entry entry) throws IOException { + try { + final var brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + return brokerEntryMetadata.getIndex(); + } else { + return null; + } + } catch (Throwable throwable) { + throw new IOException(throwable); Review Comment: `Exception Translation` is a common practice. I think it is reasonable to just raise IOException here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org