This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bd2644e4a04 [fix][admin] Report earliest msg in partitioned backlog
(#19465)
bd2644e4a04 is described below
commit bd2644e4a0414cd0e26429c95369549c8340d2c9
Author: Elliot West <[email protected]>
AuthorDate: Thu Jun 15 04:43:24 2023 +0100
[fix][admin] Report earliest msg in partitioned backlog (#19465)
Co-authored-by: tison <[email protected]>
---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 16 ++++-
.../org/apache/pulsar/client/admin/Topics.java | 2 +
.../policies/data/stats/SubscriptionStatsImpl.java | 12 ++++
.../common/policies/data/stats/TopicStatsImpl.java | 12 ++++
.../data/stats/SubscriptionStatsImplTest.java | 82 ++++++++++++++++++++++
.../policies/data/stats/TopicStatsImplTest.java | 81 +++++++++++++++++++++
6 files changed, 204 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 8d94da94856..ae8f3fdd54b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -36,6 +37,7 @@ import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -1590,6 +1592,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
+ long start1 = 0;
+ long start2 = 0;
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
@@ -1597,6 +1601,12 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
.create();
for (int i = 0; i < 10; i++) {
+ if (i == 0) {
+ start1 = Clock.systemUTC().millis();
+ }
+ if (i == 5) {
+ start2 = Clock.systemUTC().millis();
+ }
if (i > 4) {
producer.newMessage()
.value("message-1".getBytes(StandardCharsets.UTF_8))
@@ -1607,22 +1617,26 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
}
}
// wait until the message add to delay queue.
+ long finalStart1 = start1;
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin.topics().getPartitionedStats(topic,
false, true, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
5);
+
assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog()
>= finalStart1);
});
for (int i = 0; i < 5; i++) {
consumer.acknowledge(consumer.receive());
}
// Wait the ack send.
- Awaitility.await().untilAsserted(() -> {
+ long finalStart2 = start2;
+ Awaitility.await().timeout(1, MINUTES).untilAsserted(() -> {
TopicStats topicStats2 = admin.topics().getPartitionedStats(topic,
false, true, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
0);
+
assertTrue(topicStats2.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog()
>= finalStart2);
});
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 7e88b3fbd14..7b4f0ba6eee 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1325,6 +1325,8 @@ public interface Topics {
* Set to true to get precise backlog, Otherwise get imprecise
backlog.
* @param subscriptionBacklogSize
* Whether to get backlog size for each subscription.
+ * @param getEarliestTimeInBacklog
+ * Whether to get the earliest time in backlog.
* @return a future that can be used to track when the partitioned topic
statistics are returned
*/
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 1301ebc6994..ffea8407393 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -173,6 +173,7 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
delayedTrackerMemoryUsage = 0;
+ earliestMsgPublishTimeInBacklog = 0L;
subscriptionProperties.clear();
filterProcessedMsgCount = 0;
filterAcceptedMsgCount = 0;
@@ -216,6 +217,17 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
this.nonContiguousDeletedMessagesRanges +=
stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize +=
stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
+ if (this.earliestMsgPublishTimeInBacklog != 0 &&
stats.earliestMsgPublishTimeInBacklog != 0) {
+ this.earliestMsgPublishTimeInBacklog = Math.min(
+ this.earliestMsgPublishTimeInBacklog,
+ stats.earliestMsgPublishTimeInBacklog
+ );
+ } else {
+ this.earliestMsgPublishTimeInBacklog = Math.max(
+ this.earliestMsgPublishTimeInBacklog,
+ stats.earliestMsgPublishTimeInBacklog
+ );
+ }
this.subscriptionProperties.putAll(stats.subscriptionProperties);
this.filterProcessedMsgCount += stats.filterProcessedMsgCount;
this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 49f88392417..1f26c58bb37 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -208,6 +208,7 @@ public class TopicStatsImpl implements TopicStats {
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
this.publishRateLimitedTimes = 0L;
+ this.earliestMsgPublishTimeInBacklogs = 0L;
this.delayedMessageIndexSizeInBytes = 0;
this.compaction.reset();
}
@@ -297,6 +298,17 @@ public class TopicStatsImpl implements TopicStats {
}
}
}
+ if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl)
ts).earliestMsgPublishTimeInBacklogs != 0) {
+ earliestMsgPublishTimeInBacklogs = Math.min(
+ earliestMsgPublishTimeInBacklogs,
+ ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs
+ );
+ } else {
+ earliestMsgPublishTimeInBacklogs = Math.max(
+ earliestMsgPublishTimeInBacklogs,
+ ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs
+ );
+ }
return this;
}
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java
new file mode 100644
index 00000000000..8a4b5da9edd
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class SubscriptionStatsImplTest {
+
+ @Test
+ public void testReset() {
+ SubscriptionStatsImpl stats = new SubscriptionStatsImpl();
+ stats.earliestMsgPublishTimeInBacklog = 1L;
+ stats.reset();
+ assertEquals(stats.earliestMsgPublishTimeInBacklog, 0L);
+
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() {
+ SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklog = 10L;
+
+ SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklog = 20L;
+
+ SubscriptionStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() {
+ SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklog = 0L;
+
+ SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklog = 20L;
+
+ SubscriptionStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 20L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() {
+ SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklog = 10L;
+
+ SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklog = 0L;
+
+ SubscriptionStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() {
+ SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklog = 0L;
+
+ SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklog = 0L;
+
+ SubscriptionStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 0L);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java
new file mode 100644
index 00000000000..09cef4c4d0f
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class TopicStatsImplTest {
+
+ @Test
+ public void testReset() {
+ TopicStatsImpl stats = new TopicStatsImpl();
+ stats.earliestMsgPublishTimeInBacklogs = 1L;
+ stats.reset();
+ assertEquals(stats.earliestMsgPublishTimeInBacklogs, 0L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() {
+ TopicStatsImpl stats1 = new TopicStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklogs = 10L;
+
+ TopicStatsImpl stats2 = new TopicStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklogs = 20L;
+
+ TopicStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() {
+ TopicStatsImpl stats1 = new TopicStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklogs = 0L;
+
+ TopicStatsImpl stats2 = new TopicStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklogs = 20L;
+
+ TopicStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 20L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() {
+ TopicStatsImpl stats1 = new TopicStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklogs = 10L;
+
+ TopicStatsImpl stats2 = new TopicStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklogs = 0L;
+
+ TopicStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L);
+ }
+
+ @Test
+ public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() {
+ TopicStatsImpl stats1 = new TopicStatsImpl();
+ stats1.earliestMsgPublishTimeInBacklogs = 0L;
+
+ TopicStatsImpl stats2 = new TopicStatsImpl();
+ stats2.earliestMsgPublishTimeInBacklogs = 0L;
+
+ TopicStatsImpl aggregate = stats1.add(stats2);
+ assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 0L);
+ }
+}
\ No newline at end of file