This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fcecca4ec00 [fix][test] fix testEndTxnWhenCommittingOrAborting flaky 
test (#18318)
fcecca4ec00 is described below

commit fcecca4ec00bbc1fc1958195997806be021bc409
Author: congbo <[email protected]>
AuthorDate: Sat Jan 28 13:31:25 2023 +0800

    [fix][test] fix testEndTxnWhenCommittingOrAborting flaky test (#18318)
---
 .../broker/service/PulsarCommandSenderImpl.java    |  4 ---
 .../broker/intercept/CounterBrokerInterceptor.java | 30 +++++++++++-----------
 .../pulsar/broker/transaction/TransactionTest.java |  6 ++---
 3 files changed, 18 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 6510da1fbe7..dd74fc4e71e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -34,7 +34,6 @@ import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
-import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -355,9 +354,6 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         writeAndFlush(outBuf);
-        if (this.interceptor != null) {
-            this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
-        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 6cda131a67e..a8cc3edd504 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -51,20 +51,20 @@ import org.eclipse.jetty.server.Response;
 @Slf4j
 public class CounterBrokerInterceptor implements BrokerInterceptor {
 
-    private AtomicInteger beforeSendCount = new AtomicInteger();
-    private AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
-    private AtomicInteger count = new AtomicInteger();
-    private AtomicInteger connectionCreationCount = new AtomicInteger();
-    private AtomicInteger producerCount = new AtomicInteger();
-    private AtomicInteger consumerCount = new AtomicInteger();
-    private AtomicInteger messagePublishCount = new AtomicInteger();
-    private AtomicInteger messageCount = new AtomicInteger();
-    private AtomicInteger messageDispatchCount = new AtomicInteger();
-    private AtomicInteger messageAckCount = new AtomicInteger();
-    private AtomicInteger handleAckCount = new AtomicInteger();
-    private AtomicInteger txnCount = new AtomicInteger();
-    private AtomicInteger committedTxnCount = new AtomicInteger();
-    private AtomicInteger abortedTxnCount = new AtomicInteger();
+    private final AtomicInteger beforeSendCount = new AtomicInteger();
+    private final AtomicInteger beforeSendCountAtConsumerLevel = new 
AtomicInteger();
+    private final AtomicInteger count = new AtomicInteger();
+    private final AtomicInteger connectionCreationCount = new AtomicInteger();
+    private final AtomicInteger producerCount = new AtomicInteger();
+    private final AtomicInteger consumerCount = new AtomicInteger();
+    private final AtomicInteger messagePublishCount = new AtomicInteger();
+    private final AtomicInteger messageCount = new AtomicInteger();
+    private final AtomicInteger messageDispatchCount = new AtomicInteger();
+    private final AtomicInteger messageAckCount = new AtomicInteger();
+    private final AtomicInteger handleAckCount = new AtomicInteger();
+    private final AtomicInteger txnCount = new AtomicInteger();
+    private final AtomicInteger committedTxnCount = new AtomicInteger();
+    private final AtomicInteger abortedTxnCount = new AtomicInteger();
 
     public void reset() {
         beforeSendCount.set(0);
@@ -81,7 +81,7 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
         abortedTxnCount.set(0);
     }
 
-    private List<ResponseEvent> responseList = new ArrayList<>();
+    private final List<ResponseEvent> responseList = new ArrayList<>();
 
     @Data
     @AllArgsConstructor
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0fcf9ff9b33..a568db3d9f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -920,11 +920,11 @@ public class TransactionTest extends TransactionTestBase {
         field.set(abortTxn, TransactionImpl.State.ABORTING);
 
 
-        assertEquals(((CounterBrokerInterceptor)listener).getTxnCount(),2);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(listener.getTxnCount(),2));
         abortTxn.abort().get();
-        
assertEquals(((CounterBrokerInterceptor)listener).getAbortedTxnCount(),1);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(listener.getAbortedTxnCount(),1));
         commitTxn.commit().get();
-        
assertEquals(((CounterBrokerInterceptor)listener).getCommittedTxnCount(),1);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(listener.getCommittedTxnCount(),1));
     }
 
     @Test

Reply via email to