Repository: incubator-rocketmq Updated Branches: refs/heads/master 9cb0a0cd4 -> b421d48c4
Fix https://issues.apache.org/jira/browse/ROCKETMQ-25 Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b421d48c Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b421d48c Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b421d48c Branch: refs/heads/master Commit: b421d48c476e74a8c7bb8129979df1dc0cb5a5a5 Parents: 9cb0a0c Author: Zhanhui Li <[email protected]> Authored: Wed Jan 4 15:20:29 2017 +0800 Committer: Zhanhui Li <[email protected]> Committed: Wed Jan 4 15:20:29 2017 +0800 ---------------------------------------------------------------------- .../org/apache/rocketmq/client/impl/MQAdminImpl.java | 11 ++++++++++- .../tools/command/message/QueryMsgByKeySubCommand.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b421d48c/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index fb948b7..983e515 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -285,6 +288,7 @@ public class MQAdminImpl { if (!brokerAddrs.isEmpty()) { final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size()); final List<QueryResult> queryResultList = new LinkedList<QueryResult>(); + final ReadWriteLock lock = new ReentrantReadWriteLock(false); for (String addr : brokerAddrs) { try { @@ -318,7 +322,12 @@ public class MQAdminImpl { MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); - queryResultList.add(qr); + try { + lock.writeLock().lock(); + queryResultList.add(qr); + } finally { + lock.writeLock().unlock(); + } break; } default: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b421d48c/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java index 77dc6c4..bdc5f52 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -69,7 +69,7 @@ public class QueryMsgByKeySubCommand implements SubCommand { } } - void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) + private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) throws MQClientException, InterruptedException { admin.start();
