codelipenghui commented on a change in pull request #12523:
URL: https://github.com/apache/pulsar/pull/12523#discussion_r743373563



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1141,6 +1144,53 @@ public long getEstimatedBacklogSize() {
         }
     }
 
+    @Override
+    public long getEarliestMessagePublishTimeInBacklog() {
+        PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();
+
+        return getEarliestMessagePublishTimeOfPos(pos);
+    }
+
+    public long getEarliestMessagePublishTimeOfPos(PositionImpl pos) {
+        if (pos == null) {
+            return 0L;
+        }
+        PositionImpl nextPos = getNextValidPosition(pos);
+
+        CompletableFuture<Long> future = new CompletableFuture<>();
+        asyncReadEntry(nextPos, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                ByteBuf metadataAndPayload = entry.getDataBuffer();
+                BrokerEntryMetadata brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(metadataAndPayload);
+                if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp()) {
+                    future.complete(brokerEntryMetadata.getBrokerTimestamp());
+                } else {
+                    MessageMetadata messageMetadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+                    if (messageMetadata.hasPublishTime()) {
+                        future.complete(messageMetadata.getPublishTime());
+                    } else {
+                        future.complete(0L);
+                    }
+                }
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null);
+
+        long result;
+        try {
+            result = future.get();

Review comment:
       Any reason to call future.get() here? We can return a Future for this 
method, this will block the request thread for waiting the entry read.
   

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -856,8 +858,13 @@ default void delete(String topic, boolean force) throws 
PulsarAdminException {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    TopicStats getStats(String topic, boolean getPreciseBacklog,
-                        boolean subscriptionBacklogSize) throws 
PulsarAdminException;
+    TopicStats getStats(String topic, boolean getPreciseBacklog, boolean 
subscriptionBacklogSize,

Review comment:
       We can introduce a new API TopicStats getStats(String topic, 
GetStatsOptions options);
   So can we can introduce more params without breaking the API.

##########
File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -563,10 +563,14 @@ void run() throws PulsarAdminException {
         + ", locking required.")
         private boolean subscriptionBacklogSize = false;
 
+        @Parameter(names = { "-etb",
+                "--get-earliest-time-in-backlog" }, description = "Set true to 
get earliest time in backlog")

Review comment:
       ```suggestion
                   "--get-backlog-in-mills" }, description = "Set true to get 
earliest time in backlog")
   ```

##########
File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -563,10 +563,14 @@ void run() throws PulsarAdminException {
         + ", locking required.")
         private boolean subscriptionBacklogSize = false;
 
+        @Parameter(names = { "-etb",
+                "--get-earliest-time-in-backlog" }, description = "Set true to 
get earliest time in backlog")
+        private boolean getEarliestTimeInBacklog = false;

Review comment:
       Better to use a consistent name.

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -856,8 +858,13 @@ default void delete(String topic, boolean force) throws 
PulsarAdminException {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    TopicStats getStats(String topic, boolean getPreciseBacklog,
-                        boolean subscriptionBacklogSize) throws 
PulsarAdminException;
+    TopicStats getStats(String topic, boolean getPreciseBacklog, boolean 
subscriptionBacklogSize,

Review comment:
       We can't change the API directly, this will introduce a compatibility 
issue while users upgrade to the new version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to