This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bcc9db5cba [ISSUE #7614] Fix flaky test RocksDBMessageStoreTest
(#7625)
bcc9db5cba is described below
commit bcc9db5cbafba6096daf6171f4d348e146bf5c17
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Dec 11 17:37:01 2023 +0800
[ISSUE #7614] Fix flaky test RocksDBMessageStoreTest (#7625)
* fix #7614 Flaky test RocksDBMessageStoreTest
Signed-off-by: lizhanhui <[email protected]>
* fix: give TimerMessageStoreTest#testStateAndRecover more time for
Awaitability to poll and check
Signed-off-by: lizhanhui <[email protected]>
* clean up exclude test list of bazel
Signed-off-by: lizhanhui <[email protected]>
---------
Signed-off-by: lizhanhui <[email protected]>
---
store/BUILD.bazel | 4 +-
.../rocketmq/store/RocksDBMessageStoreTest.java | 162 ++++++++++++---------
.../store/timer/TimerMessageStoreTest.java | 4 +-
3 files changed, 94 insertions(+), 76 deletions(-)
diff --git a/store/BUILD.bazel b/store/BUILD.bazel
index 4b046c68eb..b884503b08 100644
--- a/store/BUILD.bazel
+++ b/store/BUILD.bazel
@@ -69,9 +69,8 @@ java_library(
GenTestRules(
name = "GeneratedTestRules",
exclude_tests = [
- # This test is extremely slow and flaky, exclude it.
+ # These tests are extremely slow and flaky, exclude them before they
are properly fixed.
"src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest",
- "src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest",
],
medium_tests = [
"src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest",
@@ -80,6 +79,7 @@ GenTestRules(
"src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
"src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest",
"src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest",
+ "src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest",
],
test_files = glob(["src/test/java/**/*Test.java"]),
deps = [
diff --git
a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
index acf5edf511..2af07197a5 100644
--- a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
@@ -60,15 +60,18 @@ import
org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.assertj.core.util.Strings;
+import org.awaitility.Awaitility;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
@@ -77,7 +80,7 @@ public class RocksDBMessageStoreTest {
private final String messageTopic = "FooBar";
private final String storeType = StoreType.DEFAULT_ROCKSDB.getStoreType();
private int queueTotal = 100;
- private AtomicInteger queueId = new AtomicInteger(0);
+ private final AtomicInteger queueId = new AtomicInteger(0);
private SocketAddress bornHost;
private SocketAddress storeHost;
private byte[] messageBody;
@@ -171,27 +174,27 @@ public class RocksDBMessageStoreTest {
if (notExecuted()) {
return;
}
- long ipv4HostMsgs = 10;
- long ipv6HostMsgs = 10;
- long totalMsgs = ipv4HostMsgs + ipv6HostMsgs;
+ long ipv4HostMessages = 10;
+ long ipv6HostMessages = 10;
+ long totalMessages = ipv4HostMessages + ipv6HostMessages;
queueTotal = 1;
messageBody = storeMessage.getBytes();
- for (long i = 0; i < ipv4HostMsgs; i++) {
+ for (long i = 0; i < ipv4HostMessages; i++) {
messageStore.putMessage(buildMessage());
}
- for (long i = 0; i < ipv6HostMsgs; i++) {
+ for (long i = 0; i < ipv6HostMessages; i++) {
messageStore.putMessage(buildIPv6HostMessage());
}
StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
- for (long i = 0; i < totalMsgs; i++) {
+ for (long i = 0; i < totalMessages; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A",
"FooBar", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
- verifyThatMasterIsFunctional(totalMsgs, messageStore);
+ verifyThatMasterIsFunctional(totalMessages, messageStore);
}
@Test
@@ -549,15 +552,13 @@ public class RocksDBMessageStoreTest {
try {
msg.setBornHost(new
InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"),
0));
} catch (UnknownHostException e) {
- e.printStackTrace();
- assertThat(Boolean.FALSE).isTrue();
+ fail("build IPv6 host message error", e);
}
try {
msg.setStoreHost(new
InetSocketAddress(InetAddress.getByName("::1"), 0));
} catch (UnknownHostException e) {
- e.printStackTrace();
- assertThat(Boolean.FALSE).isTrue();
+ fail("build IPv6 host message error", e);
}
return msg;
}
@@ -582,27 +583,27 @@ public class RocksDBMessageStoreTest {
}
@Test
- public void testGroupCommit() throws Exception {
+ public void testGroupCommit() {
if (notExecuted()) {
return;
}
- long totalMsgs = 10;
+ long totalMessages = 10;
queueTotal = 1;
messageBody = storeMessage.getBytes();
- for (long i = 0; i < totalMsgs; i++) {
+ for (long i = 0; i < totalMessages; i++) {
messageStore.putMessage(buildMessage());
}
- for (long i = 0; i < totalMsgs; i++) {
+ for (long i = 0; i < totalMessages; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A",
"TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
- verifyThatMasterIsFunctional(totalMsgs, messageStore);
+ verifyThatMasterIsFunctional(totalMessages, messageStore);
}
@Test
- public void testMaxOffset() throws InterruptedException {
+ public void testMaxOffset() {
if (notExecuted()) {
return;
}
@@ -618,11 +619,11 @@ public class RocksDBMessageStoreTest {
messageStore.putMessage(msg);
}
- while (messageStore.dispatchBehindBytes() != 0) {
- TimeUnit.MILLISECONDS.sleep(1);
- }
-
- assertThat(messageStore.getMaxOffsetInQueue(messageTopic,
queueId)).isEqualTo(firstBatchMessages);
+ Awaitility.await()
+ .with()
+ .atMost(3, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.MILLISECONDS)
+ .until(() -> messageStore.getMaxOffsetInQueue(messageTopic,
queueId) == firstBatchMessages);
// Disable the dispatcher
messageStore.getDispatcherList().clear();
@@ -644,14 +645,14 @@ public class RocksDBMessageStoreTest {
return buildIPv6HostMessage(messageBody, "FooBar");
}
- private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore
master) {
- for (long i = 0; i < totalMsgs; i++) {
+ private void verifyThatMasterIsFunctional(long totalMessages, MessageStore
master) {
+ for (long i = 0; i < totalMessages; i++) {
master.putMessage(buildMessage());
}
StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
- for (long i = 0; i < totalMsgs; i++) {
+ for (long i = 0; i < totalMessages; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "FooBar",
0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
@@ -660,7 +661,7 @@ public class RocksDBMessageStoreTest {
}
@Test
- public void testPullSize() throws Exception {
+ public void testPullSize() {
if (notExecuted()) {
return;
}
@@ -673,9 +674,11 @@ public class RocksDBMessageStoreTest {
messageStore.putMessage(messageExtBrokerInner);
}
// wait for consume queue build
- // the sleep time should be great than consume queue flush interval
- //Thread.sleep(100);
- StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .with()
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 32);
+
String group = "simple";
GetMessageResult getMessageResult32 = messageStore.getMessage(group,
topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
@@ -705,21 +708,25 @@ public class RocksDBMessageStoreTest {
messageStore.putMessage(messageExtBrokerInner);
}
- // Thread.sleep(100);//wait for build consumer queue
- StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+ // wait for build consumer queue
+ Awaitility.await()
+ .with()
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >=
100);
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
//1.just reboot
messageStore.shutdown();
- String storeRootDir = ((RocksDBMessageStore)
messageStore).getMessageStoreConfig().getStorePathRootDir();
+ String storeRootDir =
messageStore.getMessageStoreConfig().getStorePathRootDir();
messageStore = buildMessageStore(storeRootDir, topic);
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
- assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
- assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+ assertEquals(maxPhyOffset, messageStore.getMaxPhyOffset());
+ assertEquals(maxCqOffset, messageStore.getMaxOffsetInQueue(topic, 0));
//2.damage commit-log and reboot normal
for (int i = 0; i < 100; i++) {
@@ -728,20 +735,25 @@ public class RocksDBMessageStoreTest {
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
- //Thread.sleep(100);
- StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+
+ Awaitility.await()
+ .with()
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >=
200);
+
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
+ // Append a message to corrupt
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
-
messageStore.shutdown();
- //damage last message
+ // Corrupt the last message
damageCommitLog((RocksDBMessageStore) messageStore,
secondLastPhyOffset);
//reboot
@@ -749,18 +761,23 @@ public class RocksDBMessageStoreTest {
load = messageStore.load();
assertTrue(load);
messageStore.start();
- assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
- assertTrue(secondLastCqOffset ==
messageStore.getMaxOffsetInQueue(topic, 0));
+ assertEquals(secondLastPhyOffset, messageStore.getMaxPhyOffset());
+ assertEquals(secondLastCqOffset,
messageStore.getMaxOffsetInQueue(topic, 0));
- //3.damage commitlog and reboot abnormal
+ //3.Corrupt commit-log and reboot abnormal
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
- //Thread.sleep(100);
- StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+
+ Awaitility.await()
+ .with()
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >=
300);
+
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
@@ -770,20 +787,20 @@ public class RocksDBMessageStoreTest {
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();
- //damage last message
+ //Corrupt the last message
damageCommitLog((RocksDBMessageStore) messageStore,
secondLastPhyOffset);
//add abort file
- String fileName =
StorePathConfigHelper.getAbortFile(((RocksDBMessageStore)
messageStore).getMessageStoreConfig().getStorePathRootDir());
+ String fileName =
StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir());
File file = new File(fileName);
UtilAll.ensureDirOK(file.getParent());
- file.createNewFile();
+ assertTrue(file.createNewFile());
messageStore = buildMessageStore(storeRootDir, topic);
load = messageStore.load();
assertTrue(load);
messageStore.start();
- assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
- assertTrue(secondLastCqOffset ==
messageStore.getMaxOffsetInQueue(topic, 0));
+ assertEquals(secondLastPhyOffset, messageStore.getMaxPhyOffset());
+ assertEquals(secondLastCqOffset,
messageStore.getMaxOffsetInQueue(topic, 0));
//message write again
for (int i = 0; i < 100; i++) {
@@ -860,7 +877,8 @@ public class RocksDBMessageStoreTest {
MessageExtBatch msgExtBatch = buildMessageBatch(msgBatch);
try {
- PutMessageResult result =
this.messageStore.putMessages(msgExtBatch);
+ this.messageStore.putMessages(msgExtBatch);
+ fail("Should have raised an exception");
} catch (Exception e) {
assertThat(e.getMessage()).contains("message body size exceeded");
}
@@ -871,7 +889,7 @@ public class RocksDBMessageStoreTest {
if (notExecuted()) {
return;
}
- MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore)
this.messageStore).getMessageStoreConfig();
+ MessageStoreConfig messageStoreConfig =
this.messageStore.getMessageStoreConfig();
messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
messageStoreConfig.setTotalReplicas(2);
messageStoreConfig.setInSyncReplicas(2);
@@ -890,7 +908,7 @@ public class RocksDBMessageStoreTest {
if (notExecuted()) {
return;
}
- MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore)
this.messageStore).getMessageStoreConfig();
+ MessageStoreConfig messageStoreConfig =
this.messageStore.getMessageStoreConfig();
messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
messageStoreConfig.setTotalReplicas(2);
messageStoreConfig.setInSyncReplicas(2);
@@ -930,13 +948,13 @@ public class RocksDBMessageStoreTest {
}
@Test
- public void testPutLongMessage() throws Exception {
+ public void testPutLongMessage() {
if (notExecuted()) {
return;
}
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
- CommitLog commitLog = ((RocksDBMessageStore)
messageStore).getCommitLog();
- MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore)
messageStore).getMessageStoreConfig();
+ CommitLog commitLog = messageStore.getCommitLog();
+ MessageStoreConfig messageStoreConfig =
messageStore.getMessageStoreConfig();
MessageExtEncoder.PutMessageThreadLocal putMessageThreadLocal =
commitLog.getPutMessageThreadLocal().get();
//body size, topic size, properties size exactly equal to max size
@@ -944,30 +962,30 @@ public class RocksDBMessageStoreTest {
messageExtBrokerInner.setTopic(new String(new byte[127]));
messageExtBrokerInner.setPropertiesString(new String(new
byte[Short.MAX_VALUE]));
PutMessageResult encodeResult1 =
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
- assertTrue(encodeResult1 == null);
+ assertNull(encodeResult1);
//body size exactly more than max message body size
messageExtBrokerInner.setBody(new
byte[messageStoreConfig.getMaxMessageSize() + 1]);
PutMessageResult encodeResult2 =
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
- assertTrue(encodeResult2.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+ assertSame(encodeResult2.getPutMessageStatus(),
PutMessageStatus.MESSAGE_ILLEGAL);
//body size exactly equal to max message size
messageExtBrokerInner.setBody(new
byte[messageStoreConfig.getMaxMessageSize() + 64 * 1024]);
PutMessageResult encodeResult3 =
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
- assertTrue(encodeResult3.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+ assertSame(encodeResult3.getPutMessageStatus(),
PutMessageStatus.MESSAGE_ILLEGAL);
//message properties length more than properties maxSize
messageExtBrokerInner.setBody(new
byte[messageStoreConfig.getMaxMessageSize()]);
messageExtBrokerInner.setPropertiesString(new String(new
byte[Short.MAX_VALUE + 1]));
PutMessageResult encodeResult4 =
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
- assertTrue(encodeResult4.getPutMessageStatus() ==
PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+ assertSame(encodeResult4.getPutMessageStatus(),
PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
//message length more than buffer length capacity
messageExtBrokerInner.setBody(new
byte[messageStoreConfig.getMaxMessageSize()]);
messageExtBrokerInner.setTopic(new String(new byte[Short.MAX_VALUE]));
messageExtBrokerInner.setPropertiesString(new String(new
byte[Short.MAX_VALUE]));
PutMessageResult encodeResult5 =
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
- assertTrue(encodeResult5.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+ assertSame(encodeResult5.getPutMessageStatus(),
PutMessageStatus.MESSAGE_ILLEGAL);
}
@Test
@@ -976,21 +994,21 @@ public class RocksDBMessageStoreTest {
return;
}
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
- MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore)
messageStore).getMessageStoreConfig();
+ MessageStoreConfig messageStoreConfig =
messageStore.getMessageStoreConfig();
int originMaxMessageSize = messageStoreConfig.getMaxMessageSize();
messageExtBrokerInner.setBody(new byte[originMaxMessageSize + 10]);
PutMessageResult putMessageResult =
messageStore.putMessage(messageExtBrokerInner);
- assertTrue(putMessageResult.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+ assertSame(putMessageResult.getPutMessageStatus(),
PutMessageStatus.MESSAGE_ILLEGAL);
int newMaxMessageSize = originMaxMessageSize + 10;
messageStoreConfig.setMaxMessageSize(newMaxMessageSize);
putMessageResult = messageStore.putMessage(messageExtBrokerInner);
- assertTrue(putMessageResult.getPutMessageStatus() ==
PutMessageStatus.PUT_OK);
+ assertSame(putMessageResult.getPutMessageStatus(),
PutMessageStatus.PUT_OK);
messageStoreConfig.setMaxMessageSize(10);
putMessageResult = messageStore.putMessage(messageExtBrokerInner);
- assertTrue(putMessageResult.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+ assertSame(putMessageResult.getPutMessageStatus(),
PutMessageStatus.MESSAGE_ILLEGAL);
messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
}
@@ -1013,11 +1031,11 @@ public class RocksDBMessageStoreTest {
}
consumeQueueTable.put(topicName, cqTable);
}
- Assert.assertEquals(consumeQueueTable.size(), 10);
+ assertEquals(consumeQueueTable.size(), 10);
HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
messageStore.deleteTopics(Sets.difference(consumeQueueTable.keySet(),
resultSet));
- Assert.assertEquals(consumeQueueTable.size(), 2);
- Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+ assertEquals(consumeQueueTable.size(), 2);
+ assertEquals(resultSet, consumeQueueTable.keySet());
}
@Test
@@ -1038,14 +1056,14 @@ public class RocksDBMessageStoreTest {
}
consumeQueueTable.put(topicName, cqTable);
}
- Assert.assertEquals(consumeQueueTable.size(), 10);
+ assertEquals(consumeQueueTable.size(), 10);
HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
messageStore.cleanUnusedTopic(resultSet);
- Assert.assertEquals(consumeQueueTable.size(), 2);
- Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+ assertEquals(consumeQueueTable.size(), 2);
+ assertEquals(resultSet, consumeQueueTable.keySet());
}
- private class MyMessageArrivingListener implements MessageArrivingListener
{
+ private static class MyMessageArrivingListener implements
MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long
tagsCode, long msgStoreTime,
byte[] filterBitMap, Map<String, String> properties) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 02ff35681d..4ce3985f6c 100644
---
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -426,8 +426,8 @@ public class TimerMessageStoreTest {
assertEquals(first.getCommitReadTimeMs(),
second.getCommitReadTimeMs());
second.start(true);
- // Wait until all messages have wrote back to commitLog and
consumeQueue.
- await().atMost(5000, TimeUnit.MILLISECONDS).until(new
Callable<Boolean>() {
+ // Wait until all messages have been written back to commitLog and
consumeQueue.
+ await().atMost(30000, TimeUnit.MILLISECONDS).until(new
Callable<Boolean>() {
@Override
public Boolean call() {
ConsumeQueue cq = (ConsumeQueue)
messageStore.getConsumeQueue(topic, 0);