This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fcecca4ec00 [fix][test] fix testEndTxnWhenCommittingOrAborting flaky
test (#18318)
fcecca4ec00 is described below
commit fcecca4ec00bbc1fc1958195997806be021bc409
Author: congbo <[email protected]>
AuthorDate: Sat Jan 28 13:31:25 2023 +0800
[fix][test] fix testEndTxnWhenCommittingOrAborting flaky test (#18318)
---
.../broker/service/PulsarCommandSenderImpl.java | 4 ---
.../broker/intercept/CounterBrokerInterceptor.java | 30 +++++++++++-----------
.../pulsar/broker/transaction/TransactionTest.java | 6 ++---
3 files changed, 18 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 6510da1fbe7..dd74fc4e71e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -34,7 +34,6 @@ import
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
-import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -355,9 +354,6 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
writeAndFlush(outBuf);
- if (this.interceptor != null) {
- this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
- }
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 6cda131a67e..a8cc3edd504 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -51,20 +51,20 @@ import org.eclipse.jetty.server.Response;
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
- private AtomicInteger beforeSendCount = new AtomicInteger();
- private AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
- private AtomicInteger count = new AtomicInteger();
- private AtomicInteger connectionCreationCount = new AtomicInteger();
- private AtomicInteger producerCount = new AtomicInteger();
- private AtomicInteger consumerCount = new AtomicInteger();
- private AtomicInteger messagePublishCount = new AtomicInteger();
- private AtomicInteger messageCount = new AtomicInteger();
- private AtomicInteger messageDispatchCount = new AtomicInteger();
- private AtomicInteger messageAckCount = new AtomicInteger();
- private AtomicInteger handleAckCount = new AtomicInteger();
- private AtomicInteger txnCount = new AtomicInteger();
- private AtomicInteger committedTxnCount = new AtomicInteger();
- private AtomicInteger abortedTxnCount = new AtomicInteger();
+ private final AtomicInteger beforeSendCount = new AtomicInteger();
+ private final AtomicInteger beforeSendCountAtConsumerLevel = new
AtomicInteger();
+ private final AtomicInteger count = new AtomicInteger();
+ private final AtomicInteger connectionCreationCount = new AtomicInteger();
+ private final AtomicInteger producerCount = new AtomicInteger();
+ private final AtomicInteger consumerCount = new AtomicInteger();
+ private final AtomicInteger messagePublishCount = new AtomicInteger();
+ private final AtomicInteger messageCount = new AtomicInteger();
+ private final AtomicInteger messageDispatchCount = new AtomicInteger();
+ private final AtomicInteger messageAckCount = new AtomicInteger();
+ private final AtomicInteger handleAckCount = new AtomicInteger();
+ private final AtomicInteger txnCount = new AtomicInteger();
+ private final AtomicInteger committedTxnCount = new AtomicInteger();
+ private final AtomicInteger abortedTxnCount = new AtomicInteger();
public void reset() {
beforeSendCount.set(0);
@@ -81,7 +81,7 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
abortedTxnCount.set(0);
}
- private List<ResponseEvent> responseList = new ArrayList<>();
+ private final List<ResponseEvent> responseList = new ArrayList<>();
@Data
@AllArgsConstructor
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0fcf9ff9b33..a568db3d9f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -920,11 +920,11 @@ public class TransactionTest extends TransactionTestBase {
field.set(abortTxn, TransactionImpl.State.ABORTING);
- assertEquals(((CounterBrokerInterceptor)listener).getTxnCount(),2);
+ Awaitility.await().untilAsserted(() ->
assertEquals(listener.getTxnCount(),2));
abortTxn.abort().get();
-
assertEquals(((CounterBrokerInterceptor)listener).getAbortedTxnCount(),1);
+ Awaitility.await().untilAsserted(() ->
assertEquals(listener.getAbortedTxnCount(),1));
commitTxn.commit().get();
-
assertEquals(((CounterBrokerInterceptor)listener).getCommittedTxnCount(),1);
+ Awaitility.await().untilAsserted(() ->
assertEquals(listener.getCommittedTxnCount(),1));
}
@Test