codelipenghui commented on a change in pull request #11139:
URL: https://github.com/apache/pulsar/pull/11139#discussion_r660397751



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
##########
@@ -796,4 +798,55 @@ public void testSetReplicatedSubscriptionStatus() {
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
     }
 
+    @Test
+    public void testGetMessageIdByTimestamp() throws Exception {
+        TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", 
Sets.newHashSet("test"));
+        final String topicName = 
"persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        AtomicLong publishTime = new AtomicLong(0);
+        ProducerBase<byte[]> producer = (ProducerBase<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .intercept(new ProducerInterceptor() {
+                    @Override
+                    public void close() {
+
+                    }
+
+                    @Override
+                    public boolean eligible(Message message) {
+                        return true;
+                    }
+
+                    @Override
+                    public Message beforeSend(Producer producer, Message 
message) {
+                        return message;
+                    }
+
+                    @Override
+                    public void onSendAcknowledgement(Producer producer, 
Message message, MessageId msgId,
+                                                      Throwable exception) {
+                        publishTime.set(message.getPublishTime());
+                    }
+                })
+                .create();
+
+        MessageId id1 = producer.send("test1".getBytes());
+        long publish1 = publishTime.get();
+
+        Thread.sleep(10);

Review comment:
       Can be instead by Awaitibility? Use sleep here will introduce a flaky 
test.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1515,6 +1515,39 @@ public void getMessageById(
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageid/{timestamp}")
+    @ApiOperation(value = "Get message id at or after this absolute timestamp 
(in ms).")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic is not non-partitioned 
and persistent"),
+            @ApiResponse(code = 412, message = "Topic name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")})
+    public void getMessageIdByTimestamp(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Specify the timestamp", required = true)
+            @PathParam("timestamp") long timestamp,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetMessageIdByTimestamp(asyncResponse, timestamp, 
authoritative);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));

Review comment:
       the `internalGetMessageIdByTimestamp` will throw RestException, so this 
will be a problem when resume the asyncResponse by 
newRestException(RestException)

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2328,6 +2330,61 @@ public void readEntryComplete(Entry entry, Object ctx) {
         }
     }
 
+    protected void internalGetMessageIdByTimestamp(AsyncResponse 
asyncResponse, long timestamp, boolean authoritative) {

Review comment:
       It's better to return CompletableFuture<MessageIdImpl> directly, it will 
make the code easier to read. you can see 
`PersistentTopics.setMaxSubscriptionsPerTopic`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to