codelipenghui commented on a change in pull request #12523:
URL: https://github.com/apache/pulsar/pull/12523#discussion_r743373563
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1141,6 +1144,53 @@ public long getEstimatedBacklogSize() {
}
}
+ @Override
+ public long getEarliestMessagePublishTimeInBacklog() {
+ PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();
+
+ return getEarliestMessagePublishTimeOfPos(pos);
+ }
+
+ public long getEarliestMessagePublishTimeOfPos(PositionImpl pos) {
+ if (pos == null) {
+ return 0L;
+ }
+ PositionImpl nextPos = getNextValidPosition(pos);
+
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ asyncReadEntry(nextPos, new ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ ByteBuf metadataAndPayload = entry.getDataBuffer();
+ BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(metadataAndPayload);
+ if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasBrokerTimestamp()) {
+ future.complete(brokerEntryMetadata.getBrokerTimestamp());
+ } else {
+ MessageMetadata messageMetadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ if (messageMetadata.hasPublishTime()) {
+ future.complete(messageMetadata.getPublishTime());
+ } else {
+ future.complete(0L);
+ }
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+
+ long result;
+ try {
+ result = future.get();
Review comment:
Any reason to call future.get() here? We can return a Future for this
method, this will block the request thread for waiting the entry read.
##########
File path:
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -856,8 +858,13 @@ default void delete(String topic, boolean force) throws
PulsarAdminException {
* @throws PulsarAdminException
* Unexpected error
*/
- TopicStats getStats(String topic, boolean getPreciseBacklog,
- boolean subscriptionBacklogSize) throws
PulsarAdminException;
+ TopicStats getStats(String topic, boolean getPreciseBacklog, boolean
subscriptionBacklogSize,
Review comment:
We can introduce a new API TopicStats getStats(String topic,
GetStatsOptions options);
So can we can introduce more params without breaking the API.
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -563,10 +563,14 @@ void run() throws PulsarAdminException {
+ ", locking required.")
private boolean subscriptionBacklogSize = false;
+ @Parameter(names = { "-etb",
+ "--get-earliest-time-in-backlog" }, description = "Set true to
get earliest time in backlog")
Review comment:
```suggestion
"--get-backlog-in-mills" }, description = "Set true to get
earliest time in backlog")
```
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -563,10 +563,14 @@ void run() throws PulsarAdminException {
+ ", locking required.")
private boolean subscriptionBacklogSize = false;
+ @Parameter(names = { "-etb",
+ "--get-earliest-time-in-backlog" }, description = "Set true to
get earliest time in backlog")
+ private boolean getEarliestTimeInBacklog = false;
Review comment:
Better to use a consistent name.
##########
File path:
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -856,8 +858,13 @@ default void delete(String topic, boolean force) throws
PulsarAdminException {
* @throws PulsarAdminException
* Unexpected error
*/
- TopicStats getStats(String topic, boolean getPreciseBacklog,
- boolean subscriptionBacklogSize) throws
PulsarAdminException;
+ TopicStats getStats(String topic, boolean getPreciseBacklog, boolean
subscriptionBacklogSize,
Review comment:
We can't change the API directly, this will introduce a compatibility
issue while users upgrade to the new version.
##########
File path:
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -856,8 +858,13 @@ default void delete(String topic, boolean force) throws
PulsarAdminException {
* @throws PulsarAdminException
* Unexpected error
*/
- TopicStats getStats(String topic, boolean getPreciseBacklog,
- boolean subscriptionBacklogSize) throws
PulsarAdminException;
+ TopicStats getStats(String topic, boolean getPreciseBacklog, boolean
subscriptionBacklogSize,
Review comment:
No, all of the getStats methods is a public API, users might be used in
their application, if we change one of them, users will encounter compatibility
issues.
```java
TopicStats getStats(String topic, boolean getPreciseBacklog,
boolean subscriptionBacklogSize) throws
PulsarAdminException;
default TopicStats getStats(String topic, boolean getPreciseBacklog)
throws PulsarAdminException {
return getStats(topic, getPreciseBacklog, false);
}
default TopicStats getStats(String topic) throws PulsarAdminException {
return getStats(topic, false, false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]