http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java new file mode 100644 index 0000000..99204b0 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -0,0 +1,1996 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.namesrv.TopAddressing; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.*; +import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class MQClientAPIImpl { + + private final static Logger log = ClientLogger.getLog(); + public static boolean sendSmartMsg = + Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); + + static { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + } + + private final RemotingClient remotingClient; + private final TopAddressing topAddressing; + private final ClientRemotingProcessor clientRemotingProcessor; + private String nameSrvAddr = null; + private ClientConfig clientConfig; + + public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, + RPCHook rpcHook, final ClientConfig clientConfig) { + this.clientConfig = clientConfig; + topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName()); + this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); + this.clientRemotingProcessor = clientRemotingProcessor; + + this.remotingClient.registerRPCHook(rpcHook); + this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null); + } + + public List<String> getNameServerAddressList() { + return this.remotingClient.getNameServerAddressList(); + } + + public RemotingClient getRemotingClient() { + return remotingClient; + } + + public String fetchNameServerAddr() { + try { + String addrs = this.topAddressing.fetchNSAddr(); + if (addrs != null) { + if (!addrs.equals(this.nameSrvAddr)) { + log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs); + this.updateNameServerAddressList(addrs); + this.nameSrvAddr = addrs; + return nameSrvAddr; + } + } + } catch (Exception e) { + log.error("fetchNameServerAddr Exception", e); + } + return nameSrvAddr; + } + + public void updateNameServerAddressList(final String addrs) { + List<String> lst = new ArrayList<String>(); + String[] addrArray = addrs.split(";"); + if (addrArray != null) { + for (String addr : addrArray) { + lst.add(addr); + } + + this.remotingClient.updateNameServerAddressList(lst); + } + } + + public void start() { + this.remotingClient.start(); + } + + public void shutdown() { + this.remotingClient.shutdown(); + } + + public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); + + byte[] body = RemotingSerializable.encode(config); + request.setBody(body); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + + } + + public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); + requestHeader.setTopic(topicConfig.getTopicName()); + requestHeader.setDefaultTopic(defaultTopic); + requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); + requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); + requestHeader.setPerm(topicConfig.getPerm()); + requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); + requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); + requestHeader.setOrder(topicConfig.isOrder()); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public SendResult sendMessage(// + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendMessageContext context, // 7 + final DefaultMQProducerImpl producer // 8 + ) throws RemotingException, MQBrokerException, InterruptedException { + return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); + } + + public SendResult sendMessage(// + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendCallback sendCallback, // 7 + final TopicPublishInfo topicPublishInfo, // 8 + final MQClientInstance instance, // 9 + final int retryTimesWhenSendFailed, // 10 + final SendMessageContext context, // 11 + final DefaultMQProducerImpl producer // 12 + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = null; + if (sendSmartMsg) { + SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); + request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + } else { + request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); + } + + request.setBody(msg.getBody()); + + switch (communicationMode) { + case ONEWAY: + this.remotingClient.invokeOneway(addr, request, timeoutMillis); + return null; + case ASYNC: + final AtomicInteger times = new AtomicInteger(); + this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, context, producer); + return null; + case SYNC: + return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); + default: + assert false; + break; + } + + return null; + } + + private SendResult sendMessageSync(// + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + return this.processSendResponse(brokerName, msg, response); + } + + private void sendMessageAsync(// + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request, // + final SendCallback sendCallback, // + final TopicPublishInfo topicPublishInfo, // + final MQClientInstance instance, // + final int retryTimesWhenSendFailed, // + final AtomicInteger times, // + final SendMessageContext context, // + final DefaultMQProducerImpl producer // + ) throws InterruptedException, RemotingException { + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + RemotingCommand response = responseFuture.getResponseCommand(); + if (null == sendCallback && response != null) { + + try { + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); + if (context != null && sendResult != null) { + context.setSendResult(sendResult); + context.getProducer().executeSendMessageHookAfter(context); + } + } catch (Throwable e) { + // + } + + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); + return; + } + + if (response != null) { + try { + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); + assert sendResult != null; + if (context != null) { + context.setSendResult(sendResult); + context.getProducer().executeSendMessageHookAfter(context); + } + + try { + sendCallback.onSuccess(sendResult); + } catch (Throwable e) { + } + + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); + } catch (Exception e) { + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); + onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, e, context, false, producer); + } + } else { + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); + if (!responseFuture.isSendRequestOK()) { + MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); + onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } else if (responseFuture.isTimeout()) { + MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", + responseFuture.getCause()); + onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } else { + MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); + onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } + } + } + }); + } + + + private void onExceptionImpl(final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request, // + final SendCallback sendCallback, // + final TopicPublishInfo topicPublishInfo, // + final MQClientInstance instance, // + final int timesTotal, // + final AtomicInteger curTimes, // + final Exception e, // + final SendMessageContext context, // + final boolean needRetry, // + final DefaultMQProducerImpl producer // 12 + ) { + int tmp = curTimes.incrementAndGet(); + if (needRetry && tmp <= timesTotal) { + MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); + String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); + log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, + tmpmq.getBrokerName()); + try { + request.setOpaque(RemotingCommand.createNewRequestId()); + sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, + timesTotal, curTimes, context, producer); + } catch (InterruptedException e1) { + onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + context, false, producer); + } catch (RemotingConnectException e1) { + producer.updateFaultItem(brokerName, 3000, true); + onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + context, true, producer); + } catch (RemotingTooMuchRequestException e1) { + onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + context, false, producer); + } catch (RemotingException e1) { + producer.updateFaultItem(brokerName, 3000, true); + onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + context, true, producer); + } + } else { + if (context != null) { + context.setException(e); + context.getProducer().executeSendMessageHookAfter(context); + } + try { + sendCallback.onException(e); + } catch (Exception e2) { + } + } + } + + + private SendResult processSendResponse(// + final String brokerName, // + final Message msg, // + final RemotingCommand response// + ) throws MQBrokerException, RemotingCommandException { + switch (response.getCode()) { + case ResponseCode.FLUSH_DISK_TIMEOUT: + case ResponseCode.FLUSH_SLAVE_TIMEOUT: + case ResponseCode.SLAVE_NOT_AVAILABLE: { + // TODO LOG + } + case ResponseCode.SUCCESS: { + SendStatus sendStatus = SendStatus.SEND_OK; + switch (response.getCode()) { + case ResponseCode.FLUSH_DISK_TIMEOUT: + sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; + break; + case ResponseCode.FLUSH_SLAVE_TIMEOUT: + sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; + break; + case ResponseCode.SLAVE_NOT_AVAILABLE: + sendStatus = SendStatus.SLAVE_NOT_AVAILABLE; + break; + case ResponseCode.SUCCESS: + sendStatus = SendStatus.SEND_OK; + break; + default: + assert false; + break; + } + + SendMessageResponseHeader responseHeader = + (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); + + MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); + + SendResult sendResult = new SendResult(sendStatus, + MessageClientIDSetter.getUniqID(msg), + responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); + sendResult.setTransactionId(responseHeader.getTransactionId()); + String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); + String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); + if (regionId == null || regionId.isEmpty()) { + regionId = MixAll.DEFAULT_TRACE_REGION_ID; + } + if (traceOn != null && traceOn.equals("false")) { + sendResult.setTraceOn(false); + } else { + sendResult.setTraceOn(true); + } + sendResult.setRegionId(regionId); + return sendResult; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public PullResult pullMessage(// + final String addr, // + final PullMessageRequestHeader requestHeader, // + final long timeoutMillis, // + final CommunicationMode communicationMode, // + final PullCallback pullCallback// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); + + switch (communicationMode) { + case ONEWAY: + assert false; + return null; + case ASYNC: + this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); + return null; + case SYNC: + return this.pullMessageSync(addr, request, timeoutMillis); + default: + assert false; + break; + } + + return null; + } + + + private void pullMessageAsync(// + final String addr, // 1 + final RemotingCommand request, // + final long timeoutMillis, // + final PullCallback pullCallback// + ) throws RemotingException, InterruptedException { + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + RemotingCommand response = responseFuture.getResponseCommand(); + if (response != null) { + try { + PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); + assert pullResult != null; + pullCallback.onSuccess(pullResult); + } catch (Exception e) { + pullCallback.onException(e); + } + } else { + if (!responseFuture.isSendRequestOK()) { + pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause())); + } else if (responseFuture.isTimeout()) { + pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", + responseFuture.getCause())); + } else { + pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause())); + } + } + } + }); + } + + private PullResult pullMessageSync(// + final String addr, // 1 + final RemotingCommand request, // 2 + final long timeoutMillis// 3 + ) throws RemotingException, InterruptedException, MQBrokerException { + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + return this.processPullResponse(response); + } + + private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException { + PullStatus pullStatus = PullStatus.NO_NEW_MSG; + switch (response.getCode()) { + case ResponseCode.SUCCESS: + pullStatus = PullStatus.FOUND; + break; + case ResponseCode.PULL_NOT_FOUND: + pullStatus = PullStatus.NO_NEW_MSG; + break; + case ResponseCode.PULL_RETRY_IMMEDIATELY: + pullStatus = PullStatus.NO_MATCHED_MSG; + break; + case ResponseCode.PULL_OFFSET_MOVED: + pullStatus = PullStatus.OFFSET_ILLEGAL; + break; + + default: + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + PullMessageResponseHeader responseHeader = + (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); + + return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), + responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); + } + + public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader(); + requestHeader.setOffset(phyoffset); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); + MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); + return messageExt; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + requestHeader.setTimestamp(timestamp); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + SearchOffsetResponseHeader responseHeader = + (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); + return responseHeader.getOffset(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + GetMaxOffsetResponseHeader responseHeader = + (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); + + return responseHeader.getOffset(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public List<String> getConsumerIdListByGroup(// + final String addr, // + final String consumerGroup, // + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { + GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + GetConsumerListByGroupResponseBody body = + GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class); + return body.getConsumerIdList(); + } + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + GetMinOffsetResponseHeader responseHeader = + (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); + + return responseHeader.getOffset(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { + GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + GetEarliestMsgStoretimeResponseHeader responseHeader = + (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); + + return responseHeader.getTimestamp(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public long queryConsumerOffset(// + final String addr, // + final QueryConsumerOffsetRequestHeader requestHeader, // + final long timeoutMillis// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + QueryConsumerOffsetResponseHeader responseHeader = + (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); + + return responseHeader.getOffset(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public void updateConsumerOffset(// + final String addr, // + final UpdateConsumerOffsetRequestHeader requestHeader, // + final long timeoutMillis// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public void updateConsumerOffsetOneway(// + final String addr, // + final UpdateConsumerOffsetRequestHeader requestHeader, // + final long timeoutMillis// + ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, + InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); + + this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); + } + + + public void sendHearbeat(// + final String addr, // + final HeartbeatData heartbeatData, // + final long timeoutMillis// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + + request.setBody(heartbeatData.encode()); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public void unregisterClient(// + final String addr, // + final String clientID, // + final String producerGroup, // + final String consumerGroup, // + final long timeoutMillis// + ) throws RemotingException, MQBrokerException, InterruptedException { + final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); + requestHeader.setClientID(clientID); + requestHeader.setProducerGroup(producerGroup); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public void endTransactionOneway(// + final String addr, // + final EndTransactionRequestHeader requestHeader, // + final String remark, // + final long timeoutMillis// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); + + request.setRemark(remark); + this.remotingClient.invokeOneway(addr, request, timeoutMillis); + } + + + public void queryMessage( + final String addr, + final QueryMessageRequestHeader requestHeader, + final long timeoutMillis, + final InvokeCallback invokeCallback, + final Boolean isUnqiueKey + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader); + request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString()); + this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis, + invokeCallback); + } + + + public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis) + throws RemotingException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + + request.setBody(heartbeat.encode()); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + return response.getCode() == ResponseCode.SUCCESS; + } + + + public void consumerSendMessageBack( + final String addr, + final MessageExt msg, + final String consumerGroup, + final int delayLevel, + final long timeoutMillis, + final int maxConsumeRetryTimes + ) throws RemotingException, MQBrokerException, InterruptedException { + ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); + + requestHeader.setGroup(consumerGroup); + requestHeader.setOriginTopic(msg.getTopic()); + requestHeader.setOffset(msg.getCommitLogOffset()); + requestHeader.setDelayLevel(delayLevel); + requestHeader.setOriginMsgId(msg.getMsgId()); + requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public Set<MessageQueue> lockBatchMQ(// + final String addr, // + final LockBatchRequestBody requestBody, // + final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); + + request.setBody(requestBody.encode()); + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class); + Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); + return messageQueues; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public void unlockBatchMQ(// + final String addr, // + final UnlockBatchRequestBody requestBody, // + final long timeoutMillis, // + final boolean oneway// + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); + + request.setBody(requestBody.encode()); + + if (oneway) { + this.remotingClient.invokeOneway(addr, request, timeoutMillis); + } else { + RemotingCommand response = this.remotingClient + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + } + + + public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader(); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class); + return topicStatsTable; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, + MQBrokerException { + return getConsumeStats(addr, consumerGroup, null, timeoutMillis); + } + + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, + MQBrokerException { + GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class); + return consumeStats; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { + GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader(); + requestHeader.setProducerGroup(producerGroup); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return ProducerConnection.decode(response.getBody(), ProducerConnection.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { + GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); + return consumerConnection; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return KVTable.decode(response.getBody(), KVTable.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException, UnsupportedEncodingException { + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null); + + String str = MixAll.properties2String(properties); + if (str != null && str.length() > 0) { + request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET)); + RemotingCommand response = this.remotingClient + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + } + + + public Properties getBrokerConfig(final String addr, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException, UnsupportedEncodingException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null); + + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return MixAll.string2Properties(new String(response.getBody(), MixAll.DEFAULT_CHARSET)); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class); + return responseBody; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.TOPIC_NOT_EXIST: { + // TODO LOG + break; + } + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + return TopicRouteData.decode(body, TopicRouteData.class); + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.TOPIC_NOT_EXIST: { + if (!topic.equals(MixAll.DEFAULT_TOPIC)) + log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); + break; + } + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + return TopicRouteData.decode(body, TopicRouteData.class); + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public TopicList getTopicListFromNameServer(final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(body, TopicList.class); + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader(); + requestHeader.setBrokerName(brokerName); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + WipeWritePermOfBrokerResponseHeader responseHeader = + (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class); + return responseHeader.getWipeTopicCount(); + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); + requestHeader.setTopic(topic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); + requestHeader.setTopic(topic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader(); + requestHeader.setGroupName(groupName); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader(); + requestHeader.setNamespace(namespace); + requestHeader.setKey(key); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + GetKVConfigResponseHeader responseHeader = + (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class); + return responseHeader.getValue(); + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader(); + requestHeader.setNamespace(namespace); + requestHeader.setKey(key); + requestHeader.setValue(value); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG, requestHeader); + + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null) { + RemotingCommand errResponse = null; + for (String namesrvAddr : nameServerAddressList) { + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + break; + } + default: + errResponse = response; + } + } + + if (errResponse != null) { + throw new MQClientException(errResponse.getCode(), errResponse.getRemark()); + } + } + } + + + public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader(); + requestHeader.setNamespace(namespace); + requestHeader.setKey(key); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, requestHeader); + + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null) { + RemotingCommand errResponse = null; + for (String namesrvAddr : nameServerAddressList) { + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + break; + } + default: + errResponse = response; + } + } + if (errResponse != null) { + throw new MQClientException(errResponse.getCode(), errResponse.getRemark()); + } + } + } + + + public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader(); + requestHeader.setNamespace(namespace); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KVLIST_BY_NAMESPACE, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return KVTable.decode(response.getBody(), KVTable.class); + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, + final long timestamp, final boolean isForce, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false); + } + + + public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, + final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) + throws RemotingException, MQClientException, InterruptedException { + ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + requestHeader.setTimestamp(timestamp); + requestHeader.setForce(isForce); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); + if (isC) { + request.setLanguage(LanguageCode.CPP); + } + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class); + return body.getOffsetTable(); + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, + final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + requestHeader.setClientAddr(clientAddr); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class); + return body.getConsumerTable(); + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { + QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader(); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + GroupList groupList = GroupList.decode(response.getBody(), GroupList.class); + return groupList; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { + QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class); + return consumeTimeSpanBody.getConsumeTimeSpanSet(); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader(); + requestHeader.setCluster(cluster); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(body, TopicList.class); + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public void registerMessageFilterClass(final String addr, // + final String consumerGroup, // + final String topic, // + final String className, // + final int classCRC, // + final byte[] classBody, // + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException { + RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setClassName(className); + requestHeader.setTopic(topic); + requestHeader.setClassCRC(classCRC); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader); + request.setBody(classBody); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + + public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); + if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty() + && !UtilAll.isBlank(topicList.getBrokerAddr())) { + TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis); + if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) { + topicList.getTopicList().addAll(tmp.getTopicList()); + } + } + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(body, TopicList.class); + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null); + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return true; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null); + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return true; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack, + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setClientId(clientId); + requestHeader.setJstackEnable(jstack); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + ConsumerRunningInfo info = ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class); + return info; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, // + String consumerGroup, // + String clientId, // + String msgId, // + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setClientId(clientId); + requestHeader.setMsgId(msgId); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class); + return info; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup, + long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException { + QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader(); + requestHeader.setCompareGroup(group); + requestHeader.setTopic(topic); + if (filterGroup != null) { + StringBuilder sb = new StringBuilder(); + String splitor = ""; + for (String s : filterGroup) { + sb.append(splitor).append(s); + splitor = ","; + } + requestHeader.setFilterGroups(sb.toString()); + } + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + QueryCorrectionOffsetBody body = QueryCorrectionOffsetBody.decode(response.getBody(), QueryCorrectionOffsetBody.class); + return body.getCorrectionOffsets(); + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); + if (!containRetry) { + Iterator<String> it = topicList.getTopicList().iterator(); + while (it.hasNext()) { + String topic = it.next(); + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) + it.remove(); + } + } + + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); + if (!containRetry) { + Iterator<String> it = topicList.getTopicList().iterator(); + while (it.hasNext()) { + String topic = it.next(); + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) + it.remove(); + } + } + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); + if (!containRetry) { + Iterator<String> it = topicList.getTopicList().iterator(); + while (it.hasNext()) { + String topic = it.next(); + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) + it.remove(); + } + } + return topicList; + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic, + final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader(); + requestHeader.setSrcGroup(srcGroup); + requestHeader.setDestGroup(destGroup); + requestHeader.setTopic(topic); + requestHeader.setOffline(isOffline); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis) + throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException { + ViewBrokerStatsDataRequestHeader request
<TRUNCATED>
