This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8350352152ce7c64cae033b51770ad8756688089
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Jun 23 14:02:09 2025 +0800

    [feat][admin] PIP-415: Support getting message ID by index (#24222)
    
    PIP:https://github.com/apache/pulsar/pull/24220
    
     we can now obtain the offset of a message by its message id:
    
    1. Get the message by id using `get-message-by-id` cmd
    2. Get the index of the message using `Message.getIndex()`
    
    But we cannot obtain the message id by offset. Then we need to add a new 
API to get the message id by offset.
    
    Add a new http API to retrieve the message ID by offset.
    We propose to add a new API to retrieve the message ID by offset, enabling 
us to cache the mapping between message ID and offset.
    This will allow us to use offsets for seek and acknowledgment operations 
when consuming messages through the standardized API.
    
    (cherry picked from commit ed28c2166cb68955d00b28345049709196fd7083)
---
 merged_prs.txt                                     |   6 +
 .../broker/admin/impl/PersistentTopicsBase.java    | 128 +++++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  33 ++++++
 .../broker/service/BrokerEntryMetadataE2ETest.java |  96 ++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  57 +++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  26 +++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   3 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  18 +++
 run.py                                             |   7 ++
 9 files changed, 373 insertions(+), 1 deletion(-)

diff --git a/merged_prs.txt b/merged_prs.txt
new file mode 100644
index 00000000000..b109e2e1d2f
--- /dev/null
+++ b/merged_prs.txt
@@ -0,0 +1,6 @@
+218
+169
+27
+9
+3
+2
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a48df69d1d1..27d605f81d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5344,4 +5344,132 @@ public class PersistentTopicsBase extends AdminResource 
{
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+        if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "GetMessageIDByIndex is not allowed when broker entry 
metadata is disabled"));
+        }
+        if (index == null || index < 0) {
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    "Invalid message index: " + index));
+        }
+        int partitionIndex = topicName.getPartitionIndex();
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+        return future.thenCompose(__ -> {
+                    if (topicName.isGlobal()) {
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenAccept(topicMetadata -> {
+                                    if (topicMetadata.partitions > 0) {
+                                        log.warn("[{}] Not supported 
getMessageIdByIndex operation on "
+                                                        + "partitioned-topic 
{}", clientAppId(), topicName);
+                                        throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                                "GetMessageIDByIndex is not 
allowed on partitioned-topic");
+                                    }
+                                });
+                    }
+                }).thenCompose(ignore -> 
validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic persistentTopic)) {
+                        log.error("[{}] Get message id by index on a 
non-persistent topic {} is not allowed",
+                                clientAppId(), topicName);
+                        return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Get message id by index on a non-persistent 
topic is not allowed"));
+                    }
+                    ManagedLedger managedLedger = 
persistentTopic.getManagedLedger();
+                    Position lastPosition = 
managedLedger.getLastConfirmedEntry();
+                    Position firstPosition = managedLedger.getFirstPosition();
+                    if (firstPosition == null || lastPosition == null || 
firstPosition.equals(lastPosition)) {
+                        return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND,
+                                "No messages found in topic " + topicName));
+                    }
+                    return findMessageIndexByPosition(
+                            
PositionFactory.create(firstPosition.getLedgerId(), 0),
+                            managedLedger)
+                            .thenCompose(firstIndex -> {
+                                if (index < firstIndex) {
+                                    return 
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+                                } else {
+                                    return 
managedLedger.asyncFindPosition(entry -> {
+                                        try {
+                                            Long messageIndex = 
getIndexFromEntry(entry);
+                                            if (messageIndex == null) {
+                                                return false; // Skip messages 
without index
+                                            } else {
+                                                // If the message index is 
less than the requested index,
+                                                // we continue searching
+                                                return messageIndex < index;
+                                            }
+                                        } catch (Throwable e) {
+                                            log.error("Error deserialize 
message for message position find", e);
+                                            return false;
+                                        } finally {
+                                            entry.release();
+                                        }
+                                    });
+                                }
+                            }).thenCompose(position -> {
+                                if (position.compareTo(lastPosition) > 0) {
+                                    return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND,
+                                            "Message not found for index " + 
index));
+                                } else {
+                                    return 
CompletableFuture.completedFuture(position);
+                                }
+                            });
+                }).thenCompose(position -> CompletableFuture.completedFuture(
+                        new MessageIdImpl(position.getLedgerId(), 
position.getEntryId(), partitionIndex)));
+    }
+
+    protected CompletableFuture<Long> findMessageIndexByPosition(Position 
position, ManagedLedger managedLedger) {
+        CompletableFuture<Long> indexFuture = new CompletableFuture<>();
+        managedLedger.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                try {
+                    Long index = getIndexFromEntry(entry);
+                    if (index == null) {
+                        indexFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Broker entry metadata is not present in the 
message"));
+                    } else if (index < 0) {
+                        indexFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Invalid message index: " + index));
+                    } else {
+                        indexFuture.complete(index);
+                    }
+                } catch (Throwable e) {
+                    indexFuture.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                            "Failed to get index from entry: " + 
e.getMessage()));
+                } finally {
+                    entry.release();
+                }
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                log.error("[{}] Failed to read position {} on topic {}",
+                        clientAppId(), position, topicName, exception);
+                indexFuture.completeExceptionally(exception);
+            }
+        }, null);
+        return indexFuture;
+    }
+
+
+    private static Long getIndexFromEntry(Entry entry) {
+        final var brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+            return brokerEntryMetadata.getIndex();
+        } else {
+            return null;
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 16b07afd1b7..5ee04d8817f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -5095,5 +5095,38 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")
+    @ApiOperation(hidden = true, value = "Get Message ID by index.",
+            notes = "If the specified index is a system message, "
+                    + "it will return the message id of the later message.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or partitioned topic 
does not exist, "
+                    + "or the index is invalid"),
+            @ApiResponse(code = 406, message = "The topic is not a persistent 
topic"),
+            @ApiResponse(code = 412, message = "The broker is not enable 
broker entry metadata"),
+    })
+    public void getMessageIDByIndex(@Suspended final AsyncResponse 
asyncResponse,
+                                    @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
+                                    @PathParam("topic") @Encoded String 
encodedTopic,
+                                    @QueryParam("index") long index,
+                                    @QueryParam("authoritative") 
@DefaultValue("false")
+                                                   boolean authoritative){
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalGetMessageIDByIndexAsync(index, authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get message id by index for 
topic {}, partition id {}, index {}",
+                                clientAppId(), topicName, index, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index d7dbe34b8fd..4966330c33c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -19,25 +19,32 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import javax.ws.rs.NotAllowedException;
+import javax.ws.rs.NotFoundException;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.assertj.core.api.ThrowableAssert;
 import org.assertj.core.util.Sets;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -419,4 +426,93 @@ public class BrokerEntryMetadataE2ETest extends 
BrokerTestBase {
 
         cursor.close();
     }
+
+    @Test
+    public void testGetMessageIdByIndex() throws Exception {
+        // 1. test no partitioned topic
+        final String topicName = newTopicName();
+        admin.topics().createNonPartitionedTopic(topicName);
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+        MessageIdImpl messageId = (MessageIdImpl) producer.send("test");
+        Message<byte[]>
+                message =
+                admin.topics().getMessagesById(topicName, 
messageId.getLedgerId(), messageId.getEntryId()).get(0);
+        long index = message.getIndex().get();
+        MessageIdImpl messageIdByIndex = (MessageIdImpl) 
admin.topics().getMessageIdByIndex(topicName, index);
+        Assert.assertEquals(messageIdByIndex, messageId);
+
+        // 2. test partitioned topic
+        final String topicName2 = newTopicName();
+        final String partitionedTopicName = topicName2 + "-partition-" + 0;
+        admin.topics().createPartitionedTopic(topicName2, 10);
+        @Cleanup
+        Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName2)
+                .enableBatching(false)
+                .create();
+
+        MessageIdImpl messageId2 = null;
+        for (int i = 0; i < 200; i++) {
+            messageId2 = (MessageIdImpl) producer2.send("test" + i);
+            if (messageId2.getPartitionIndex() == 0) {
+                break;
+            }
+        }
+        Message<byte[]>
+                message2 = admin.topics().getMessagesById(partitionedTopicName,
+                messageId2.getLedgerId(), messageId2.getEntryId()).get(0);
+        long index2 = message2.getIndex().get();
+        // 2.1 test partitioned topic name with partition index
+        MessageIdImpl messageIdByIndex2 =
+                (MessageIdImpl) 
admin.topics().getMessageIdByIndex(partitionedTopicName, index2);
+        Assert.assertEquals(messageIdByIndex2, messageId2);
+        // 2.2 test partitioned topic name without partition index
+        assertThrowsWithCause(() -> 
admin.topics().getMessageIdByIndex(topicName2, index2),
+                PulsarAdminException.class, NotAllowedException.class);
+
+        // 3. test invalid index
+        assertThrowsWithCause(() -> 
admin.topics().getMessageIdByIndex(topicName, -1),
+                PulsarAdminException.class, NotFoundException.class);
+
+        assertThrowsWithCause(() -> 
admin.topics().getMessageIdByIndex(topicName, 100000),
+                PulsarAdminException.class, NotFoundException.class);
+    }
+
+    @Test
+    public void testGetMessageIdByIndexForEmptyTopic() throws 
PulsarAdminException {
+        final String topicName = newTopicName();
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        assertThrowsWithCause(() -> 
admin.topics().getMessageIdByIndex(topicName, 0),
+                PulsarAdminException.class, NotFoundException.class);
+    }
+
+    @Test
+    public void testGetMessageIdByIndexOutOfIndex() throws 
PulsarAdminException, PulsarClientException {
+        final String topicName = newTopicName();
+        admin.topics().createNonPartitionedTopic(topicName);
+        @Cleanup
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < 100; i++) {
+            producer.send("msg-" + i);
+        }
+
+        assertThrowsWithCause(() -> 
admin.topics().getMessageIdByIndex(topicName, 1000),
+                PulsarAdminException.class, NotFoundException.class);
+    }
+
+    private void assertThrowsWithCause(ThrowableAssert.ThrowingCallable 
executable,
+                                       Class<? extends Throwable> 
expectedException,
+                                       Class<? extends Throwable> 
expectedCause) {
+        assertThatThrownBy(executable)
+                .isInstanceOf(expectedException)
+                .hasRootCauseInstanceOf(expectedCause);
+    }
+
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index c681bd1a7bc..a2fcd60deb5 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -4558,4 +4558,61 @@ public interface Topics {
     default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, 
String sourceTopic) {
         return createShadowTopicAsync(shadowTopic, sourceTopic, null);
     }
+
+    /**
+     * Get the message id by index. If the index points to a system message, 
return the first user message following it;
+     * if the specified message has expired and been deleted, return 
MessageId.Earliest.
+     * The messages without entry metadata will be skipped, and the next 
matched message whose index >= the specified
+     * index will be returned.
+     * @param topicName either the specific partition name of a partitioned 
topic (e.g. my-topic-partition-0)
+     *                 or the original topic name for non-partitioned topics.
+     * @param index the index of a message
+     * @return the message id of the message.
+     * When retrieving a message ID by index, the resolution is limited to the 
**entry** level (an entry is the minimal
+     * storage unit for messages in Pulsar's persistence layer).
+     * If message batching is enabled, a single entry may contain multiple 
messages with distinct indexes.
+     * Example Scenario (partition with 2 entries):
+     * | Entry | Ledger ID | Entry ID | Index | Messages |
+     * | :--- | ---: | ---: | ---: | ---: |
+     * | A | 0 | 0 | 2 | 0,1,2 |
+     * | B | 0 | 1 | 4 | 3,4 |
+     * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** 
(e.g., `MessageId(0:0:*)` for Entry A).
+     * @throws NotAuthorizedException      (HTTP 403 Forbidden) Client lacks 
permissions to access the topic/namespace.
+     * @throws NotFoundException           (HTTP 404 Not Found) Source 
topic/namespace does not exist, or invalid index.
+     * @throws PulsarAdminException        (HTTP 406 Not Acceptable) Specified 
topic is not a persistent topic.
+     * @throws PreconditionFailedException (HTTP 412 Precondition Failed) 
Broker entry metadata is disabled.
+     * @throws PulsarAdminException        For other errors (e.g., HTTP 500 
Internal Server Error).
+     */
+    MessageId getMessageIdByIndex(String topicName, long index) throws 
PulsarAdminException;
+
+
+    /**
+     * Get the message id by index asynchronously. If the index points to a 
system message, return the first user
+     * message following it; if the specified message has expired and been 
deleted, return MessageId.Earliest.
+     * The messages without entry metadata will be skipped, and the next 
matched message whose index >= the specified
+     * index will be returned.
+     * @param topicName either the specific partition name of a partitioned 
topic (e.g. my-topic-partition-0) or
+     *                 the original topic name for non-partitioned topics.
+     * @param index the index of a message
+     * When retrieving a message ID by index, the resolution is limited to the 
**entry** level (an entry is the minimal
+     *              storage unit for messages in Pulsar's persistence layer).
+     * If message batching is enabled, a single entry may contain multiple 
messages with distinct indexes.
+     * Example Scenario (partition with 2 entries):
+     * | Entry | Ledger ID | Entry ID | Index | Messages |
+     * | :--- | ---: | ---: | ---: | ---: |
+     * | A | 0 | 0 | 2 | 0,1,2 |
+     * | B | 0 | 1 | 4 | 3,4 |
+     * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** 
(e.g., `MessageId(0:0:*)` for Entry A).
+     * @implNote The return {@link CompletableFuture<MessageId>} that 
completes with the message id of the message.
+     *         The future may complete exceptionally with:
+     *         <ul>
+     *             <li>{@link NotAuthorizedException} (HTTP 403) Permission 
denied for topic/namespace access.</li>
+     *             <li>{@link NotFoundException} (HTTP 404) Shadow 
topic/namespace does not exist or invalid index.</li>
+     *             <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is 
not a persistent topic.</li>
+     *             <li>{@link PreconditionFailedException} (HTTP 412) Broker 
entry metadata is not enabled.</li>
+     *             <li>{@link PulsarAdminException} (HTTP 307) Redirect 
required to the correct broker.</li>
+     *             <li>{@link PulsarAdminException} Other errors (e.g., HTTP 
500).</li>
+     *         </ul>
+     */
+    CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, 
long index);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 9c4a6eef753..8a1e0e0f839 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2819,5 +2819,31 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         });
     }
 
+    @Override
+    public MessageId getMessageIdByIndex(String topicName, long index) throws 
PulsarAdminException {
+        return sync(() -> getMessageIdByIndexAsync(topicName, index));
+    }
+
+    @Override
+    public CompletableFuture<MessageId> getMessageIdByIndexAsync(String 
topicName, long index) {
+        final CompletableFuture<MessageId> messageIdCompletableFuture = new 
CompletableFuture<>();
+        TopicName topic = validateTopic(topicName);
+        WebTarget path = topicPath(topic, "getMessageIdByIndex");
+        path = path.queryParam("index", index);
+        asyncGetRequest(path, new InvocationCallback<MessageIdImpl>(){
+
+            @Override
+            public void completed(MessageIdImpl messageId) {
+                messageIdCompletableFuture.complete(messageId);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                messageIdCompletableFuture.completeExceptionally(throwable);
+            }
+        });
+        return messageIdCompletableFuture;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 44035d396fa..675eca867a3 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2176,7 +2176,8 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("get-shadow-source 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1");
 
-
+        cmdTopics.run(split("get-message-id-by-index 
persistent://myprop/clust/ns1/ds1 -i 0"));
+        
verify(mockTopics).getMessageIdByIndex("persistent://myprop/clust/ns1/ds1", 0);
 
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index ca15e111390..b8ed4c30d87 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -275,6 +275,8 @@ public class CmdTopics extends CmdBase {
         addCommand("set-schema-validation-enforce", new 
SetSchemaValidationEnforced());
 
         addCommand("trim-topic", new TrimTopic());
+
+        addCommand("get-message-id-by-index", new GetMessageIdByIndex());
     }
 
     @Command(description = "Get the list of topics under a namespace.")
@@ -3058,4 +3060,20 @@ public class CmdTopics extends CmdBase {
             getAdmin().topics().trimTopic(topic);
         }
     }
+
+    @Command(description = "Get message id by index")
+    private class GetMessageIdByIndex extends CliCommand {
+
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "--index", "-i" }, description = "Index to get 
message id for the topic", required = true)
+        private Long index;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(topicName);
+            getAdmin().topics().getMessageIdByIndex(topic, index);
+        }
+    }
 }
diff --git a/run.py b/run.py
new file mode 100755
index 00000000000..38a8f501f73
--- /dev/null
+++ b/run.py
@@ -0,0 +1,7 @@
+# 影响力指数算法(权重可调)
+def calculate_impact(pr):
+    return (pr['comments']*0.3 + pr['reviews']*0.5 + 
pr['derived_commits']*1.2) * (1 + 0.1*pr['years_active'])
+
+# 使用本地git数据替代API
+git log --merges --grep "Merge pull request #" --format="%H %s" | awk '{print 
$5}' | cut -c2- > merged_prs.txt
+

Reply via email to