This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest
(#22252)
21e56955d5e is described below
commit 21e56955d5e9e804b88fbc0d599b6fb172c4f5ff
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Mar 12 22:26:34 2024 +0200
[fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
(cherry picked from commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b)
---
.../RGUsageMTAggrWaitForAllMsgsTest.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 9bf7e3c5325..54c23cacc0d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup;
import com.google.common.collect.Sets;
import io.prometheus.client.Summary;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
@@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
// The tests implement a set of producer/consumer operations on a set of
topics.
// [A thread is started for each producer, and each consumer in the test.]
@@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit;
// After sending/receiving all the messages, traffic usage statistics, and
Prometheus-metrics
// are verified on the RGs.
@Slf4j
+@Test(groups = "flaky")
public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
@BeforeClass
@Override
@@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
private final int numMesgsToProduce;
private final String myProduceTopic;
- private int sentNumBytes = 0;
- private int sentNumMsgs = 0;
- private int numExceptions = 0;
+ private volatile int sentNumBytes = 0;
+ private volatile int sentNumMsgs = 0;
+ private volatile int numExceptions = 0;
ProduceMessages(int prodId, int nMesgs, String[] topics) {
producerId = prodId;
@@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
private final int recvTimeoutMilliSecs = 1000;
private final int ackTimeoutMilliSecs = 1100; // has to be more than 1
second
- private int recvdNumBytes = 0;
- private int recvdNumMsgs = 0;
- private int numExceptions = 0;
+ private volatile int recvdNumBytes = 0;
+ private volatile int recvdNumMsgs = 0;
+ private volatile int numExceptions = 0;
private volatile boolean allMessagesReceived = false;
private volatile boolean consumerIsReady = false;
@@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
while (numConsumersDone < NUM_CONSUMERS) {
for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
if (!joinedConsumers[ix]) {
+ consThr[ix].thread.join();
+ joinedConsumers[ix] = true;
+ log.debug("Joined consumer={}", ix);
+
recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
numConsumerExceptions +=
consThr[ix].consumer.getNumExceptions();
log.debug("Consumer={} received {} mesgs and {} bytes",
ix, recvdMsgs, recvdBytes);
- consThr[ix].thread.join();
- joinedConsumers[ix] = true;
- log.debug("Joined consumer={}", ix);
-
recvdNumBytes += recvdBytes;
recvdNumMsgs += recvdMsgs;
numConsumersDone++;