codelipenghui commented on a change in pull request #11139:
URL: https://github.com/apache/pulsar/pull/11139#discussion_r660397751
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
##########
@@ -796,4 +798,55 @@ public void testSetReplicatedSubscriptionStatus() {
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
}
+ @Test
+ public void testGetMessageIdByTimestamp() throws Exception {
+ TenantInfoImpl tenantInfo = new
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("tenant-xyz", tenantInfo);
+ admin.namespaces().createNamespace("tenant-xyz/ns-abc",
Sets.newHashSet("test"));
+ final String topicName =
"persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ AtomicLong publishTime = new AtomicLong(0);
+ ProducerBase<byte[]> producer = (ProducerBase<byte[]>)
pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .intercept(new ProducerInterceptor() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean eligible(Message message) {
+ return true;
+ }
+
+ @Override
+ public Message beforeSend(Producer producer, Message
message) {
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer,
Message message, MessageId msgId,
+ Throwable exception) {
+ publishTime.set(message.getPublishTime());
+ }
+ })
+ .create();
+
+ MessageId id1 = producer.send("test1".getBytes());
+ long publish1 = publishTime.get();
+
+ Thread.sleep(10);
Review comment:
Can be instead by Awaitibility? Use sleep here will introduce a flaky
test.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1515,6 +1515,39 @@ public void getMessageById(
}
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/messageid/{timestamp}")
+ @ApiOperation(value = "Get message id at or after this absolute timestamp
(in ms).")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic is not non-partitioned
and persistent"),
+ @ApiResponse(code = 412, message = "Topic name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global
cluster configuration")})
+ public void getMessageIdByTimestamp(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Specify the timestamp", required = true)
+ @PathParam("timestamp") long timestamp,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetMessageIdByTimestamp(asyncResponse, timestamp,
authoritative);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
Review comment:
the `internalGetMessageIdByTimestamp` will throw RestException, so this
will be a problem when resume the asyncResponse by
newRestException(RestException)
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2328,6 +2330,61 @@ public void readEntryComplete(Entry entry, Object ctx) {
}
}
+ protected void internalGetMessageIdByTimestamp(AsyncResponse
asyncResponse, long timestamp, boolean authoritative) {
Review comment:
It's better to return CompletableFuture<MessageIdImpl> directly, it will
make the code easier to read. you can see
`PersistentTopics.setMaxSubscriptionsPerTopic`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]