This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-e2e.git
The following commit(s) were added to refs/heads/master by this push:
new 364fc9a [ISSUE#43]Add 4 features under simple directory (#44)
364fc9a is described below
commit 364fc9a2fdd4d7de6b7abb857f889f7069299ff2
Author: alani <[email protected]>
AuthorDate: Tue Jun 20 10:46:08 2023 +0800
[ISSUE#43]Add 4 features under simple directory (#44)
* features
* Delete ConsumerGroup.feature
* features
* Delete bdd/src/main/resources/consumer directory
* features
* features
* finish consumer and producer features in client, 2 sql filter features
and a tag filter feature
* fix a problem in SimpleConsumerInitTest.java
* fix some problems in features
* update
* Add 4 features of SimpleConsumers scenarios
* Add 4 features of SimpleConsumers scenarios
---------
Co-authored-by: alani <[email protected]>
---
.../org/apache/rocketmq/ClientInitStepdefs.java | 174 +++++++++++++++++++--
.../server/abnormal/PushConsumerRetry.feature | 52 ++++++
bdd/src/main/resources/simple/SimpleAck.feature | 38 +++++
.../main/resources/simple/SimpleMsgType.feature | 52 ++++++
.../main/resources/simple/SimpleOrderParam.feature | 67 ++++++++
.../main/resources/simple/SimpleParameter.feature | 103 ++++++++++++
.../broker/simple/SimpleOrderParamTest.java | 1 +
7 files changed, 471 insertions(+), 16 deletions(-)
diff --git a/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
b/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
index 1e46a77..972004f 100644
--- a/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
+++ b/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
@@ -179,7 +179,8 @@ public class ClientInitStepdefs {
@And("Create a message, including the Topic\\({string}), Tag\\({string}),
Body\\({string}), deliveryTimestamp\\({string})")
public void createAMessageIncludingTheTopicTagBodyDeliveryTimestamp(String
arg0, String arg1, String arg2, String arg3) {
-
+
+
}
@And("Create a message, including the Topic\\({string}), Tag\\({string}),
Body\\({string}), messageGroup\\({string})")
@@ -269,21 +270,6 @@ public class ClientInitStepdefs {
}
- @Then("Check SimpleConsumer pull a message once")
- public void checkSimpleConsumerPullAMessageOnce() {
-
- }
-
- @And("SimpleConsumer invokes receive method {string} and returns acks
{string}")
- public void simpleconsumerInvokesReceiveMethodAndReturnsAcks(String arg0,
String arg1) {
-
- }
-
- @Then("Check all messages are pulled by SimpleConsumer {string}")
- public void checkAllMessagesArePulledBySimpleConsumer(String arg0) {
-
- }
-
@When("Create a {string}, set the
ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}),
AwaitDuration\\({string}), SubscriptionExpressions\\(NULL)")
public void
createASetTheClientConfigurationEndpointConsumerGroupAwaitDurationSubscriptionExpressionsNULL(String
arg0, String arg1, String arg2, String arg3) {
@@ -417,4 +403,160 @@ public class ClientInitStepdefs {
@When("Create a {string}, set the
ClientConfiguration\\(Endpoint:{string}), and MaxAttempts\\({int})")
public void createASetTheClientConfigurationEndpointAndMaxAttempts(String
arg0, String arg1, int arg2) {
}
+
+ @Then("Check only the first message that can be pulled by SimpleConsumer")
+ public void checkOnlyTheFirstMessageThatCanBePulledBySimpleConsumer() {
+ }
+
+ @And("Create {string} messages, including the Topic\\({string}),
Tag\\({string}), Key\\({string}), MessageGroup\\({string}), and
Body\\({string})")
+ public void
createMessagesIncludingTheTopicTagKeyMessageGroupAndBody(String arg0, String
arg1, String arg2, String arg3, String arg4, String arg5) {
+
+ }
+
+ @And("Check the pulled messages that can be retried except the first one")
+ public void checkThePulledMessagesThatCanBeRetriedExceptTheFirstOne() {
+
+ }
+
+ @And("Check the number of retried messages equals to {int}")
+ public void checkTheNumberOfRetriedMessagesEqualsTo(int arg0) {
+ }
+
+ @And("SimpleConsumer invokes the method receive\\(maxMessageNum:{string},
invisibleDuration:{string}) {string}")
+ public void
simpleconsumerInvokesReceiveMaxMessageNumInvisibleDuration(String arg0, String
arg1, String arg2) {
+
+ }
+
+
+ @Then("Check the duration between each two retrying consumptions equals to
{int}s")
+ public void
checkTheDurationBetweenEachTwoRetryingConsumptionsEqualsToS(int arg0) {
+
+ }
+
+ @And("SimpleConsumer returns an ack within {string}")
+ public void simpleconsumerReturnsAnAckWithin(String arg0) {
+
+ }
+
+ @And("Set a failed assertion with {string}")
+ public void setAFailedAssertionWith(String arg0) {
+
+ }
+
+ @And("Check a new {string} messages are pulled {string} and acked {string}
by SimpleConsumer within {int}s {string}")
+ public void
checkANewMessagesArePulledAndAckedBySimpleConsumerWithinS(String arg0, String
arg1, String arg2, int arg3, String arg4) {
+
+ }
+
+ @And("SimpleConsumer invokes receive\\(maxMessageNum:{string},
invisibleDuration:{string}, changeInvisibleDuration:{string}) {string} without
returning ack")
+ public void
simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationChangeInvisibleDurationWithoutReturningAck(String
arg0, String arg1, String arg2, String arg3) {
+
+ }
+
+ @And("SimpleConsumer invokes receive\\(maxMessageNum:{string},
invisibleDuration:{string}) during next consumption and returns an ack")
+ public void
simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationDuringNextConsumptionAndReturnsAnAck(String
arg0, String arg1) {
+
+ }
+
+ @Then("Check SimpleConsumer pulls the message and returns an ack within
{int}s but over {int}s")
+ public void
checkSimpleConsumerPullsTheMessageAndReturnsAnAckWithinSButOverS(int arg0, int
arg1) {
+
+ }
+
+ @And("SimpleConsumer invokes receive\\(maxMessageNum:{string},
invisibleDuration:{string}) during next consumption")
+ public void
simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationDuringNextConsumption(String
arg0, String arg1) {
+
+ }
+
+ @Then("Set SimpleConsumer changeInvisibleDuration\\({string})")
+ public void setSimpleConsumerChangeInvisibleDuration(String arg0) {
+
+ }
+
+ @And("Check changeInvisibleDuration after the ack is failed")
+ public void checkChangeInvisibleDurationAfterTheAckIsFailed() {
+
+ }
+
+ @And("Check the consumption is failed")
+ public void checkTheConsumptionIsFailed() {
+
+ }
+
+ @And("SimpleConsumer invokes the method receive\\(maxMessageNum:{string},
invisibleDuration:{string}) {string}")
+ public void
simpleconsumerInvokesTheMethodReceiveMaxMessageNumInvisibleDuration(String
arg0, String arg1, String arg2) {
+
+ }
+
+ @Then("SimpleConsumer returns acks {string}")
+ public void simpleconsumerReturnsAcks(String arg0) {
+ }
+
+ @Then("Check all {string} messages are received and acked within {int}s
{string}")
+ public void checkAllMessagesAreReceivedAndAckedWithinS(String arg0, int
arg1, String arg2) {
+ }
+
+ @Then("SimpleConsumer returns acks for all received messages except the
first one with Body\\({string})")
+ public void
simpleconsumerReturnsAcksForAllReceivedMessagesExceptTheFirstOneWithBody(String
arg0) {
+
+ }
+
+ @Then("Check only the first {string} messages with Body\\({string}) that
can be received")
+ public void checkOnlyTheFirstMessagesWithBodyThatCanBeReceived(String
arg0, String arg1) {
+ }
+
+ @And("Check all received messages that can be consumed again {string}")
+ public void checkAllReceivedMessagesThatCanBeConsumedAgain(String arg0) {
+ }
+
+ @And("Check the number of consumptions equals to {int}")
+ public void checkTheNumberOfConsumptionsEqualsTo(int arg0) {
+ }
+
+ @And("SimpleConsumer returns an ack after DeliveryAttempt\\({int})")
+ public void simpleconsumerReturnsAnAckAfterDeliveryAttempt(int arg0) {
+ }
+
+ @And("Check all messages that can be consumed and acked within {int}s")
+ public void checkAllMessagesThatCanBeConsumedAndAckedWithinS(int arg0) {
+
+ }
+
+ @Then("SimpleConsumer returns acks for all received messages except the
first one")
+ public void
simpleconsumerReturnsAcksForAllReceivedMessagesExceptTheFirstOne() {
+ }
+
+ @And("Check only the first message is not acked")
+ public void checkOnlyTheFirstMessageIsNotAcked() {
+ }
+
+ @And("SimpleConsumer returns acks for all received messages")
+ public void simpleconsumerReturnsAcksForAllReceivedMessages() {
+ }
+
+ @And("SimpleConsumer returns an ack when DeliveryAttempt value equals
{int}")
+ public void simpleconsumerReturnsAnAckWhenDeliveryAttemptValueEquals(int
arg0) {
+ }
+
+ @Then("Check SimpleConsumer receives only up to {int} messages once")
+ public void checkSimpleConsumerReceivesOnlyUpToMessagesOnce(int arg0) {
+ }
+
+ @And("Check no acked messages that can be consumed again")
+ public void checkNoAckedMessagesThatCanBeConsumedAgain() {
+
+ }
+
+ @Then("SimpleConsumer returns ack for all received messages except the
first one")
+ public void
simpleconsumerReturnsAckForAllReceivedMessagesExceptTheFirstOne() {
+ }
+
+ @And("SimpleConsumer waits for {int}s after receiving the messages")
+ public void simpleconsumerWaitsForSAfterReceivingTheMessages(int arg0) {
+
+ }
+
+ @Then("SimpleConsumer returns an ack")
+ public void simpleconsumerReturnsAnAck() {
+ }
}
diff --git a/bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature
b/bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature
new file mode 100644
index 0000000..be043fe
--- /dev/null
+++ b/bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Test retry of PushConsumer
+
+ Scenario: The normal message is sent, and after the PushConsumer partial
retry, the retry message is expected to be consumed
+ Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a "PushConsumer", set the
ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"),
SubscriptionExpressions("random-topic", "random-FilterExpression"),
ConsumptionThreadCount(20)
+ And Set PushConsumer Listener, “half” of the number of pre-sent messages
are consumed successful and return SUCCESS, another "half" are consumed failed
and return FAILURE
+ And Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("10s"), Topic("random-topic")
+ Then Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "10" messages "synchronous"
+ Then Check all messages send "successfully"
+ Then Check NoRetryMessages equals to RetryMessages and equals to "half" of
the number of pre-sent messages
+ And Check all messages are contained in retryMessages or noRetryMessages
+ And Shutdown the producer and consumer if they are started
+
+ Scenario: The send order message, after the PushConsumer partial retry, is
expected to consume the retry message, and the message consumption order and
send order
+ Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+ When Create a "PushConsumer", set the
ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"),
SubscriptionExpressions("random-topic", "random-FilterExpression"),
ConsumptionThreadCount(20), and MessageListener("default")
+ And Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("10s"), Topic("random-topic")
+ Then Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), Body("Body"), and MessageGroup("group")
+ And Send "10" messages "synchronous"
+ Then Check all messages send "successfully"
+ And Wait until all the messages that can be consumed
+ And Check the order of received messages consistent with the order of
pre-sent messages
+ And Shutdown the producer and consumer if they are started
+
+ Scenario: Send sequential messages, using three Shardingkeys, after partial
retries, expect to consume retry messages, and the order of message consumption
and the order of message delivery
+ Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+ When Create a "PushConsumer", set the
ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"),
SubscriptionExpressions("random-topic", "random-FilterExpression"),
ConsumptionThreadCount(20), and MessageListener("default")
+ And Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("10s"), Topic("random-topic")
+ Then Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), Body("Body"), and MessageGroup("group")
+ And Send "10" messages "synchronous"
+ Then Check all messages send "successfully"
+ And Wait until all the messages that can be consumed
+ Then Separate the messages into 3 ShardingKeyGroups according to
messageGroup
+ And Check the message order in each ShardingKeyGroup
+ And Shutdown the producer and consumer if they are started
+
diff --git a/bdd/src/main/resources/simple/SimpleAck.feature
b/bdd/src/main/resources/simple/SimpleAck.feature
new file mode 100644
index 0000000..2c6999f
--- /dev/null
+++ b/bdd/src/main/resources/simple/SimpleAck.feature
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Test SimpleConsumer pulls normal messages and returns acks
synchronously or asynchronously
+
+ Scenario Outline: Send 20 normal messages synchronously and expect to
consume with receive()/receiveAsync() and ack()/ackAsync() messages successfully
+ Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "20" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"1",
invisibleDuration:"10s") "<TransmissionMode>"
+ Then SimpleConsumer returns acks "<AckMode>"
+ Then Check all "Normal" messages are received and acked within 90s
"successfully"
+ And Shutdown the producer and consumer if they are started
+
+ Examples:
+ | TransmissionMode | AckMode |
+ | synchronously | synchronously |
+ | asynchronously | synchronously |
+ | synchronously | asynchronously |
+ | asynchronously | asynchronously |
+
+
+
diff --git a/bdd/src/main/resources/simple/SimpleMsgType.feature
b/bdd/src/main/resources/simple/SimpleMsgType.feature
new file mode 100644
index 0000000..4ce6002
--- /dev/null
+++ b/bdd/src/main/resources/simple/SimpleMsgType.feature
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Test SimpleConsumer pulls and ack order/delay/transaction messages
synchronously properly
+
+ Scenario: Send 20 order messages synchronously, expect SimpleConsumer to
receive() and ack() messages properly and the order to be maintained
+ Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create "a" messages, including the Topic("Topic"), Tag("Tag"),
Key("Key"), MessageGroup("Group"), and Body("Body")
+ And Send "20" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"1",
invisibleDuration:"10s") "synchronously"
+ Then SimpleConsumer returns acks "synchronously"
+ Then Check all "FIFO" messages are received and acked within 90s
"successfully"
+ And Shutdown the producer and consumer if they are started
+
+ Scenario: Send 10 delay messages synchronously, expect SimpleConsumer to
receive() and ack() messages properly
+ Given Create a "Delay" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Body("Body"), deliveryTimestamp("10s")
+ And Send "10" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"1",
invisibleDuration:"10s") "synchronously"
+ Then SimpleConsumer returns acks "synchronously"
+ Then Check all "Delay" messages are received and acked within 90s
"successfully"
+ And Shutdown the producer and consumer if they are started
+
+ Scenario: Send 10 transaction messages synchronously, expect SimpleConsumer
to receive() and ack() messages properly
+ Given Create a "Transaction" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "10" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"1",
invisibleDuration:"10s") "synchronously"
+ Then SimpleConsumer returns acks "synchronously"
+ Then Check all "Transaction" messages are received and acked within 90s
"successfully"
+ And Shutdown the producer and consumer if they are started
diff --git a/bdd/src/main/resources/simple/SimpleOrderParam.feature
b/bdd/src/main/resources/simple/SimpleOrderParam.feature
new file mode 100644
index 0000000..0d509ec
--- /dev/null
+++ b/bdd/src/main/resources/simple/SimpleOrderParam.feature
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Test SimpleConsumer pulls, ack and retry order messages
+
+ Scenario: Send 20 order messages synchronously with the same MessageGroup,
then SimpleConsumer consumes messages orderly with receive() but not ack()
messages, expect messages to be stuck at the first message
+ Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create "20" messages, including the Topic("random-topic"),
Tag("TagA"), Key("Key"), MessageGroup("group1"), and Body("1-20")
+ And Send "20" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"3",
invisibleDuration:"10s") "synchronously"
+ Then SimpleConsumer returns acks for all received messages except the
first one with Body("1")
+ Then Check only the first "1" messages with Body("1") that can be received
+ And Shutdown the producer and consumer if they are started
+
+# Scenario: Send 20 order messages synchronously with eight different
MessageGroups, then SimpleConsumer consumes messages orderly with receive() but
not ack() messages, expect the order to be maintained
+# Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+# When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+# And Create "20" messages, including the Topic("random-topic"),
Tag("TagA"), Key("Key"), MessageGroup("8 different groups"), and Body("Body")
+# And Send "20" messages "synchronously"
+# Then Check all messages send "successfully"
+# And SimpleConsumer invokes receive(1) "synchronously"
+# Then Check only the first message that can be pulled by SimpleConsumer
+# And Shutdown the producer and consumer if they are started
+
+
+ Scenario: Send 20 order messages synchronously, then SimpleConsumer invokes
receive(3) in batch, and all pulled messages return ack() except the first one,
expect the order to be maintained and the messages to be consumed again after a
certain time
+ Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create "20" messages, including the Topic("random-topic"),
Tag("TagA"), Key("Key"), MessageGroup("group1"), and Body("1-20")
+ And Send "20" messages "synchronously"
+ And Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"3",
invisibleDuration:"10s") "synchronously"
+ Then SimpleConsumer returns acks for all received messages except the
first one with Body("1")
+ And Check only the first "3" messages with Body("1-3") that can be received
+ And Check all received messages that can be consumed again "successfully"
+# 第一条没有返回ack,因此会被重新拉取,由于发送的是顺序消息,所以后面的消息也会被重新拉取
+ And Shutdown the producer and consumer if they are started
+#
+# Scenario: Send 20 order messages synchronously, then SimpleConsumer invokes
receive(3) in batch, only ack() the first one of pulled messages, expect the
order to be maintained and other messages to be consumed again after a certain
time
+# Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently"
group:"random-group"
+# When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+# And Create "20" messages, including the Topic("random-topic"),
Tag("TagA"), Key("Key"), MessageGroup("group1"), and Body("Body")
+# And Send "20" messages "synchronously"
+# And Check all messages send "successfully"
+# Then SimpleConsumer invokes receive(3) "synchronously" and returns acks
"synchronously" for "the first one" pulled messages
+# And Check the pulled messages that can be retried except the first one
+# And Check the number of retried messages equals to 2
+## 如果一起拉取三条,只有第一条返回ack,剩余两条会被重试,那么viewList.stream().filter(msg ->
msg.getDeliveryAttempt() == 1).count()
应该是2,原来用例是等于1,而且getDeliveryAttenmpt是从0开始计数的
+# And Shutdown the producer and consumer if they are started
diff --git a/bdd/src/main/resources/simple/SimpleParameter.feature
b/bdd/src/main/resources/simple/SimpleParameter.feature
new file mode 100644
index 0000000..3717c67
--- /dev/null
+++ b/bdd/src/main/resources/simple/SimpleParameter.feature
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Test some complex scenarios about pulling and ack of SimpleConsumer
+
+ Scenario: Send a normal message, then the SimpleConsumer invokes
receive(1,10s), and an ack is returned after three retries, expect retry times
to be 3 and retry interval to be 10s
+ Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "a" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"1",
invisibleDuration:"10s") "synchronously"
+ And SimpleConsumer returns an ack when DeliveryAttempt value equals 4
+ Then Check the duration between each two retrying consumptions equals to
10s
+ And Check the number of consumptions equals to 4
+ And Check all messages that can be consumed and acked within 90s
+ And Shutdown the producer and consumer if they are started
+
+ Scenario: Send a normal message, then the SimpleConsumer invokes
receive(1,10s), and an ack is returned within 11s section. If an
INVALID_RECEIPT_HANDLE error message is displayed, then receive a new message
again and return an ack successfully
+ Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "a" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"1",
invisibleDuration:"10s") "synchronously"
+ And SimpleConsumer waits for 11s after receiving the messages
+ Then SimpleConsumer returns an ack
+ Then Check exceptions can be thrown
+ And Check a new "Normal" messages are pulled "synchronously" and acked
"synchronously" by SimpleConsumer within 90s "successfully"
+ And Shutdown the producer and consumer if they are started
+
+# Scenario: Send a normal message, then the SimpleConsumer invokes
receive(1,10s), then delay the invisibleTime to 20s, and consume the messages
again with receive(1,30s), expect to receive and ack in 30s
+# Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+# When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+# And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+# And Send "a" messages "synchronously"
+# Then Check all messages send "successfully"
+# And SimpleConsumer invokes receive(maxMessageNum:"1",
invisibleDuration:"10s", changeInvisibleDuration:"20s") "synchronously" without
returning ack
+# And Check all messages that can be consumed and acked within 90s
+# And SimpleConsumer invokes receive(maxMessageNum:"1",
invisibleDuration:"30s") during next consumption
+# And SimpleConsumer ack "a" messages
+# Then Check SimpleConsumer pulls the message and returns an ack within 30s
but over 20s
+## 用例描述里面写的是20s内,但判断的是大于20s,小于30s
+# And Shutdown the producer and consumer if they are started
+
+# Scenario: Send a normal message, then the SimpleConsumer invokes
receive(1,10s), acks and changeInvisibleDuration, expect to change indicating
illegal ReceiptHandle
+# Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+# When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+# And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+# And Send "a" messages "synchronously"
+# Then Check all messages send "successfully"
+# And SimpleConsumer invokes receive(maxMessageNum:"1",
invisibleDuration:"10s") "synchronously"
+# And SimpleConsumer ack "a" messages
+# Then Set SimpleConsumer changeInvisibleDuration("10s")
+# And Check changeInvisibleDuration after the ack is failed
+# And Check the consumption is failed
+# And Shutdown the producer and consumer if they are started
+
+ Scenario: Send 300 normal messages synchronously, then the SimpleConsumer
invokes receive(50,10s), expect only up to 32 messages are consumed and acked
at the same time, and no ack messages are consumed again
+ Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "300" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"50",
invisibleDuration:"10s") "synchronously"
+ Then Check SimpleConsumer receives only up to 32 messages once
+ And SimpleConsumer returns acks for all received messages
+ And Check no acked messages that can be consumed again
+ And Check all messages that can be consumed within 60s
+ And Shutdown the producer and consumer if they are started
+
+ Scenario: Send 20 normal messages synchronously, then SimpleConsumer invokes
receive(50) in batch, and all pulled messages return ack() except the first
one, expect the consumption not to be affected and the ack messages not to be
consumed again
+ Given Create a "Normal" topic:"random-topic" if not exist, a
"Concurrently" group:"random-group"
+ When Create a Producer, set the Endpoint("127.0.0.1:9876"),
RequestTimeout:("random-group"), Topic("random-topic")
+ And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"),
Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"),
Duration("10s")
+ And Create a message, including the Topic("random-topic"), Tag("TagA"),
Key("Key"), and Body("Body")
+ And Send "20" messages "synchronously"
+ Then Check all messages send "successfully"
+ And SimpleConsumer invokes the method receive(maxMessageNum:"50",
invisibleDuration:"10s") "synchronously"
+ Then SimpleConsumer returns ack for all received messages except the first
one
+ And Check SimpleConsumer receives only up to 32 messages once
+ And Check all messages that can be consumed within 90s
+ And Check only the first message is not acked
+ And Shutdown the producer and consumer if they are started
+
+
diff --git
a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java
b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java
index 2351a99..09587d7 100644
---
a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java
+++
b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java
@@ -328,6 +328,7 @@ public class SimpleOrderParamTest extends BaseOperate {
List<MessageView> viewList = entry.getValue();
long actual = viewList.stream().filter(msg ->
msg.getDeliveryAttempt() == 2).count();
Assertions.assertEquals(1, actual, String.format("The
number of message retries obtained was not expected, expect:%s, actual:%s", 1,
actual));
+// DeliveryAttempt是从0开始计数的,判断重试应该让getDeliveryAttempt()==1,
一起拉取三条消息,第一条没重试,那重试的还剩两条,所以应该让actual==2
}
}
} catch (InterruptedException e) {