This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 882d56b466e9a910f004dcdc38873f3cda73b3a7 Author: Baodi Shi <[email protected]> AuthorDate: Mon Jul 25 14:05:27 2022 +0800 [fix][flaky-test] BrokerInterceptorTest.testProducerCreation (#16742) (cherry picked from commit bc94643bc1a7f365dfb75e389ac3e1156770a119) --- .../broker/intercept/CounterBrokerInterceptor.java | 66 +++++++++++++--------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index da4d4f01631..930bebf5e47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.FilterChain; import javax.servlet.ServletException; import javax.servlet.ServletRequest; @@ -47,14 +48,25 @@ import org.eclipse.jetty.server.Response; @Slf4j public class CounterBrokerInterceptor implements BrokerInterceptor { - int beforeSendCount = 0; - int count = 0; - int connectionCreationCount = 0; - int producerCount = 0; - int consumerCount = 0; - int messageCount = 0; - int messageDispatchCount = 0; - int messageAckCount = 0; + private AtomicInteger beforeSendCount = new AtomicInteger(); + private AtomicInteger count = new AtomicInteger(); + private AtomicInteger connectionCreationCount = new AtomicInteger(); + private AtomicInteger producerCount = new AtomicInteger(); + private AtomicInteger consumerCount = new AtomicInteger(); + private AtomicInteger messageCount = new AtomicInteger(); + private AtomicInteger messageDispatchCount = new AtomicInteger(); + private AtomicInteger messageAckCount = new AtomicInteger(); + + public void reset() { + beforeSendCount.set(0); + count.set(0); + connectionCreationCount.set(0); + producerCount.set(0); + consumerCount.set(0); + messageCount.set(0); + messageDispatchCount.set(0); + messageAckCount.set(0); + } private List<ResponseEvent> responseList = new ArrayList<>(); @@ -70,7 +82,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { if (log.isDebugEnabled()) { log.debug("Connection created {}", cnx); } - connectionCreationCount++; + connectionCreationCount.incrementAndGet(); } @Override @@ -80,7 +92,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { log.debug("Producer created with name={}, id={}", producer.getProducerName(), producer.getProducerId()); } - producerCount++; + producerCount.incrementAndGet(); } @Override @@ -91,7 +103,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { log.debug("Consumer created with name={}, id={}", consumer.consumerName(), consumer.consumerId()); } - consumerCount++; + consumerCount.incrementAndGet(); } @Override @@ -102,7 +114,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { log.debug("Message published topic={}, producer={}", producer.getTopic().getName(), producer.getProducerName()); } - messageCount++; + messageCount.incrementAndGet(); } @Override @@ -112,13 +124,13 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { log.debug("Message dispatched topic={}, consumer={}", consumer.getSubscription().getTopic().getName(), consumer.consumerName()); } - messageDispatchCount++; + messageDispatchCount.incrementAndGet(); } @Override public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ack) { - messageAckCount++; + messageAckCount.incrementAndGet(); } @Override @@ -130,7 +142,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { log.debug("Send message to topic {}, subscription {}", subscription.getTopic(), subscription.getName()); } - beforeSendCount++; + beforeSendCount.incrementAndGet(); } @Override @@ -138,7 +150,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { if (log.isDebugEnabled()) { log.debug("[{}] On [{}] Pulsar command", count, command.getType().name()); } - count ++; + count.incrementAndGet(); } @Override @@ -148,7 +160,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { @Override public void onWebserviceRequest(ServletRequest request) { - count ++; + count.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString()); } @@ -156,7 +168,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { @Override public void onWebserviceResponse(ServletRequest request, ServletResponse response) { - count ++; + count.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response); } @@ -169,7 +181,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { @Override public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - count = 100; + count.set(100); chain.doFilter(request, response); } @@ -184,35 +196,35 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { } public int getCount() { - return count; + return count.get(); } public int getProducerCount() { - return producerCount; + return producerCount.get(); } public int getConsumerCount() { - return consumerCount; + return consumerCount.get(); } public int getMessagePublishCount() { - return messageCount; + return messageCount.get(); } public int getMessageDispatchCount() { - return messageDispatchCount; + return messageDispatchCount.get(); } public int getMessageAckCount() { - return messageAckCount; + return messageAckCount.get(); } public int getBeforeSendCount() { - return beforeSendCount; + return beforeSendCount.get(); } public int getConnectionCreationCount() { - return connectionCreationCount; + return connectionCreationCount.get(); } public void clearResponseList() {
