liangyepianzhou commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2126017133


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,131 @@ protected 
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+        if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "GetMessageIDByIndex is not allowed when broker entry 
metadata is disabled"));
+        }
+        if (index == null || index < 0) {
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    "Invalid message index: " + index));
+        }
+        int partitionIndex = topicName.getPartitionIndex();
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+        return future.thenCompose(__ -> {
+                    if (topicName.isGlobal()) {
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenAccept(topicMetadata -> {
+                                    if (topicMetadata.partitions > 0) {
+                                        log.warn("[{}] Not supported 
getMessageIdByIndex operation on "
+                                                        + "partitioned-topic 
{}", clientAppId(), topicName);
+                                        throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                                "GetMessageIDByIndex is not 
allowed on partitioned-topic");
+                                    }
+                                });
+                    }
+                }).thenCompose(ignore -> 
validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic persistentTopic)) {
+                        log.error("[{}] Get message id by index on a 
non-persistent topic {} is not allowed",
+                                clientAppId(), topicName);
+                        return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Get message id by index on a non-persistent 
topic is not allowed"));
+                    }
+                    ManagedLedger managedLedger = 
persistentTopic.getManagedLedger();
+                    return findMessageIndexByPosition(
+                            
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+                            managedLedger)
+                            .thenCompose(firstIndex -> {
+                                if (index < firstIndex) {
+                                    return 
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+                                } else {
+                                    return 
managedLedger.asyncFindPosition(entry -> {
+                                        try {
+                                            Long messageIndex = 
getIndexFromEntry(entry);
+                                            if (messageIndex == null) {
+                                                return false; // Skip messages 
without index
+                                            } else {
+                                                // If the message index is 
less than the requested index,
+                                                // we continue searching
+                                                return messageIndex < index;
+                                            }
+                                        } catch (IOException e) {
+                                            log.error("Error deserialize 
message for message position find", e);
+                                            return false;
+                                        } finally {
+                                            entry.release();
+                                        }
+                                    });
+                                }
+                            }).thenCompose(position -> {
+                                Position lastPosition = 
managedLedger.getLastConfirmedEntry();
+                                if (position == null || 
position.compareTo(lastPosition) > 0) {
+                                    return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND,
+                                            "Message not found for index " + 
index));
+                                } else {
+                                    return 
CompletableFuture.completedFuture(position);
+                                }
+                            });
+                }).thenCompose(position -> CompletableFuture.completedFuture(
+                        new MessageIdImpl(position.getLedgerId(), 
position.getEntryId(), partitionIndex)));
+    }
+
+    protected CompletableFuture<Long> findMessageIndexByPosition(Position 
position, ManagedLedger managedLedger) {
+        CompletableFuture<Long> indexFuture = new CompletableFuture<>();
+        managedLedger.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                try {
+                    Long index = getIndexFromEntry(entry);
+                    if (index == null) {
+                        indexFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Broker entry metadata is not present in the 
message"));
+                    } else if (index < 0) {
+                        indexFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Invalid message index: " + index));
+                    } else {
+                        indexFuture.complete(index);
+                    }
+                } catch (IOException e) {
+                    indexFuture.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                            "Failed to get index from entry: " + 
e.getMessage()));
+                } finally {
+                    entry.release();
+                }
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                log.error("[{}] Failed to read position {} on topic {}",
+                        clientAppId(), position, topicName, exception);
+                indexFuture.completeExceptionally(exception);
+            }
+        }, null);
+        return indexFuture;
+    }
+
+
+    private static Long getIndexFromEntry(Entry entry) throws IOException {
+        try {
+            final var brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+            if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) 
{
+                return brokerEntryMetadata.getIndex();
+            } else {
+                return null;
+            }
+        } catch (Throwable throwable) {
+            throw new IOException(throwable);

Review Comment:
   `Exception Translation` is a common practice. I think it is reasonable to 
just raise IOException here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to