This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest 
(#22252)
21e56955d5e is described below

commit 21e56955d5e9e804b88fbc0d599b6fb172c4f5ff
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Mar 12 22:26:34 2024 +0200

    [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
    
    (cherry picked from commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b)
---
 .../RGUsageMTAggrWaitForAllMsgsTest.java           | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 9bf7e3c5325..54c23cacc0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup;
 
 import com.google.common.collect.Sets;
 import io.prometheus.client.Summary;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
@@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 
 // The tests implement a set of producer/consumer operations on a set of 
topics.
 // [A thread is started for each producer, and each consumer in the test.]
@@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 // After sending/receiving all the messages, traffic usage statistics, and 
Prometheus-metrics
 // are verified on the RGs.
 @Slf4j
+@Test(groups = "flaky")
 public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
@@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
         private final int numMesgsToProduce;
         private final String myProduceTopic;
 
-        private int sentNumBytes = 0;
-        private int sentNumMsgs = 0;
-        private int numExceptions = 0;
+        private volatile int sentNumBytes = 0;
+        private volatile int sentNumMsgs = 0;
+        private volatile int numExceptions = 0;
 
         ProduceMessages(int prodId, int nMesgs, String[] topics) {
             producerId = prodId;
@@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 
         private final int recvTimeoutMilliSecs = 1000;
         private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 
second
-        private int recvdNumBytes = 0;
-        private int recvdNumMsgs = 0;
-        private int numExceptions = 0;
+        private volatile int recvdNumBytes = 0;
+        private volatile int recvdNumMsgs = 0;
+        private volatile int numExceptions = 0;
         private volatile boolean allMessagesReceived = false;
         private volatile boolean consumerIsReady = false;
 
@@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
         while (numConsumersDone < NUM_CONSUMERS) {
             for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
                 if (!joinedConsumers[ix]) {
+                    consThr[ix].thread.join();
+                    joinedConsumers[ix] = true;
+                    log.debug("Joined consumer={}", ix);
+
                     recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
                     recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
                     numConsumerExceptions += 
consThr[ix].consumer.getNumExceptions();
                     log.debug("Consumer={} received {} mesgs and {} bytes", 
ix, recvdMsgs, recvdBytes);
 
-                    consThr[ix].thread.join();
-                    joinedConsumers[ix] = true;
-                    log.debug("Joined consumer={}", ix);
-
                     recvdNumBytes += recvdBytes;
                     recvdNumMsgs += recvdMsgs;
                     numConsumersDone++;

Reply via email to