http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java deleted file mode 100644 index 3d5ba28..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java +++ /dev/null @@ -1,1996 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl; - -import com.alibaba.rocketmq.client.ClientConfig; -import com.alibaba.rocketmq.client.consumer.PullCallback; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.PullStatus; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.hook.SendMessageContext; -import com.alibaba.rocketmq.client.impl.consumer.PullResultExt; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.producer.SendCallback; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.client.producer.SendStatus; -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.admin.ConsumeStats; -import com.alibaba.rocketmq.common.admin.TopicStatsTable; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.namesrv.TopAddressing; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.*; -import com.alibaba.rocketmq.common.protocol.header.*; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.*; -import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData; -import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; -import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.remoting.InvokeCallback; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.RemotingClient; -import com.alibaba.rocketmq.remoting.exception.*; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; -import com.alibaba.rocketmq.remoting.netty.ResponseFuture; -import com.alibaba.rocketmq.remoting.protocol.LanguageCode; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; -import org.slf4j.Logger; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * @author shijia.wxr - */ -public class MQClientAPIImpl { - - private final static Logger log = ClientLogger.getLog(); - public static boolean sendSmartMsg = - Boolean.parseBoolean(System.getProperty("com.alibaba.rocketmq.client.sendSmartMsg", "true")); - - static { - System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - } - - private final RemotingClient remotingClient; - private final TopAddressing topAddressing; - private final ClientRemotingProcessor clientRemotingProcessor; - private String nameSrvAddr = null; - private ClientConfig clientConfig; - - public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, - RPCHook rpcHook, final ClientConfig clientConfig) { - this.clientConfig = clientConfig; - topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName()); - this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); - this.clientRemotingProcessor = clientRemotingProcessor; - - this.remotingClient.registerRPCHook(rpcHook); - this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); - - this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); - - this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null); - - this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null); - - this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); - - this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null); - } - - public List<String> getNameServerAddressList() { - return this.remotingClient.getNameServerAddressList(); - } - - public RemotingClient getRemotingClient() { - return remotingClient; - } - - public String fetchNameServerAddr() { - try { - String addrs = this.topAddressing.fetchNSAddr(); - if (addrs != null) { - if (!addrs.equals(this.nameSrvAddr)) { - log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs); - this.updateNameServerAddressList(addrs); - this.nameSrvAddr = addrs; - return nameSrvAddr; - } - } - } catch (Exception e) { - log.error("fetchNameServerAddr Exception", e); - } - return nameSrvAddr; - } - - public void updateNameServerAddressList(final String addrs) { - List<String> lst = new ArrayList<String>(); - String[] addrArray = addrs.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); - } - } - - public void start() { - this.remotingClient.start(); - } - - public void shutdown() { - this.remotingClient.shutdown(); - } - - public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); - - byte[] body = RemotingSerializable.encode(config); - request.setBody(body); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - - } - - public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); - requestHeader.setTopic(topicConfig.getTopicName()); - requestHeader.setDefaultTopic(defaultTopic); - requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); - requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); - requestHeader.setPerm(topicConfig.getPerm()); - requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); - requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); - requestHeader.setOrder(topicConfig.isOrder()); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 - ) throws RemotingException, MQBrokerException, InterruptedException { - return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); - } - - public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendCallback sendCallback, // 7 - final TopicPublishInfo topicPublishInfo, // 8 - final MQClientInstance instance, // 9 - final int retryTimesWhenSendFailed, // 10 - final SendMessageContext context, // 11 - final DefaultMQProducerImpl producer // 12 - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = null; - if (sendSmartMsg) { - SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); - } else { - request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); - } - - request.setBody(msg.getBody()); - - switch (communicationMode) { - case ONEWAY: - this.remotingClient.invokeOneway(addr, request, timeoutMillis); - return null; - case ASYNC: - final AtomicInteger times = new AtomicInteger(); - this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, context, producer); - return null; - case SYNC: - return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); - default: - assert false; - break; - } - - return null; - } - - private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - return this.processSendResponse(brokerName, msg, response); - } - - private void sendMessageAsync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int retryTimesWhenSendFailed, // - final AtomicInteger times, // - final SendMessageContext context, // - final DefaultMQProducerImpl producer // - ) throws InterruptedException, RemotingException { - this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { - @Override - public void operationComplete(ResponseFuture responseFuture) { - RemotingCommand response = responseFuture.getResponseCommand(); - if (null == sendCallback && response != null) { - - try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); - if (context != null && sendResult != null) { - context.setSendResult(sendResult); - context.getProducer().executeSendMessageHookAfter(context); - } - } catch (Throwable e) { - // - } - - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); - return; - } - - if (response != null) { - try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); - assert sendResult != null; - if (context != null) { - context.setSendResult(sendResult); - context.getProducer().executeSendMessageHookAfter(context); - } - - try { - sendCallback.onSuccess(sendResult); - } catch (Throwable e) { - } - - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); - } catch (Exception e) { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, e, context, false, producer); - } - } else { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); - if (!responseFuture.isSendRequestOK()) { - MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } else if (responseFuture.isTimeout()) { - MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } else { - MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } - } - } - }); - } - - - private void onExceptionImpl(final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int timesTotal, // - final AtomicInteger curTimes, // - final Exception e, // - final SendMessageContext context, // - final boolean needRetry, // - final DefaultMQProducerImpl producer // 12 - ) { - int tmp = curTimes.incrementAndGet(); - if (needRetry && tmp <= timesTotal) { - MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); - String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); - log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - tmpmq.getBrokerName()); - try { - request.setOpaque(RemotingCommand.createNewRequestId()); - sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, - timesTotal, curTimes, context, producer); - } catch (InterruptedException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, false, producer); - } catch (RemotingConnectException e1) { - producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, true, producer); - } catch (RemotingTooMuchRequestException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, false, producer); - } catch (RemotingException e1) { - producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, true, producer); - } - } else { - if (context != null) { - context.setException(e); - context.getProducer().executeSendMessageHookAfter(context); - } - try { - sendCallback.onException(e); - } catch (Exception e2) { - } - } - } - - - private SendResult processSendResponse(// - final String brokerName, // - final Message msg, // - final RemotingCommand response// - ) throws MQBrokerException, RemotingCommandException { - switch (response.getCode()) { - case ResponseCode.FLUSH_DISK_TIMEOUT: - case ResponseCode.FLUSH_SLAVE_TIMEOUT: - case ResponseCode.SLAVE_NOT_AVAILABLE: { - // TODO LOG - } - case ResponseCode.SUCCESS: { - SendStatus sendStatus = SendStatus.SEND_OK; - switch (response.getCode()) { - case ResponseCode.FLUSH_DISK_TIMEOUT: - sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; - break; - case ResponseCode.FLUSH_SLAVE_TIMEOUT: - sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; - break; - case ResponseCode.SLAVE_NOT_AVAILABLE: - sendStatus = SendStatus.SLAVE_NOT_AVAILABLE; - break; - case ResponseCode.SUCCESS: - sendStatus = SendStatus.SEND_OK; - break; - default: - assert false; - break; - } - - SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); - - MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); - - SendResult sendResult = new SendResult(sendStatus, - MessageClientIDSetter.getUniqID(msg), - responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); - sendResult.setTransactionId(responseHeader.getTransactionId()); - String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); - String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); - if (regionId == null || regionId.isEmpty()) { - regionId = MixAll.DEFAULT_TRACE_REGION_ID; - } - if (traceOn != null && traceOn.equals("false")) { - sendResult.setTraceOn(false); - } else { - sendResult.setTraceOn(true); - } - sendResult.setRegionId(regionId); - return sendResult; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public PullResult pullMessage(// - final String addr, // - final PullMessageRequestHeader requestHeader, // - final long timeoutMillis, // - final CommunicationMode communicationMode, // - final PullCallback pullCallback// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); - - switch (communicationMode) { - case ONEWAY: - assert false; - return null; - case ASYNC: - this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); - return null; - case SYNC: - return this.pullMessageSync(addr, request, timeoutMillis); - default: - assert false; - break; - } - - return null; - } - - - private void pullMessageAsync(// - final String addr, // 1 - final RemotingCommand request, // - final long timeoutMillis, // - final PullCallback pullCallback// - ) throws RemotingException, InterruptedException { - this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { - @Override - public void operationComplete(ResponseFuture responseFuture) { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); - assert pullResult != null; - pullCallback.onSuccess(pullResult); - } catch (Exception e) { - pullCallback.onException(e); - } - } else { - if (!responseFuture.isSendRequestOK()) { - pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause())); - } else if (responseFuture.isTimeout()) { - pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause())); - } else { - pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause())); - } - } - } - }); - } - - private PullResult pullMessageSync(// - final String addr, // 1 - final RemotingCommand request, // 2 - final long timeoutMillis// 3 - ) throws RemotingException, InterruptedException, MQBrokerException { - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - return this.processPullResponse(response); - } - - private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException { - PullStatus pullStatus = PullStatus.NO_NEW_MSG; - switch (response.getCode()) { - case ResponseCode.SUCCESS: - pullStatus = PullStatus.FOUND; - break; - case ResponseCode.PULL_NOT_FOUND: - pullStatus = PullStatus.NO_NEW_MSG; - break; - case ResponseCode.PULL_RETRY_IMMEDIATELY: - pullStatus = PullStatus.NO_MATCHED_MSG; - break; - case ResponseCode.PULL_OFFSET_MOVED: - pullStatus = PullStatus.OFFSET_ILLEGAL; - break; - - default: - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - PullMessageResponseHeader responseHeader = - (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); - - return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), - responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); - } - - public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { - ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader(); - requestHeader.setOffset(phyoffset); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); - MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); - return messageExt; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { - SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); - requestHeader.setTimestamp(timestamp); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - SearchOffsetResponseHeader responseHeader = - (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); - return responseHeader.getOffset(); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { - GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - GetMaxOffsetResponseHeader responseHeader = - (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); - - return responseHeader.getOffset(); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public List<String> getConsumerIdListByGroup(// - final String addr, // - final String consumerGroup, // - final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - MQBrokerException, InterruptedException { - GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - if (response.getBody() != null) { - GetConsumerListByGroupResponseBody body = - GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class); - return body.getConsumerIdList(); - } - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { - GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - GetMinOffsetResponseHeader responseHeader = - (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); - - return responseHeader.getOffset(); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { - GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - GetEarliestMsgStoretimeResponseHeader responseHeader = - (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); - - return responseHeader.getTimestamp(); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public long queryConsumerOffset(// - final String addr, // - final QueryConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); - - return responseHeader.getOffset(); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public void updateConsumerOffset(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public void updateConsumerOffsetOneway(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// - ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, - InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); - - this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); - } - - - public void sendHearbeat(// - final String addr, // - final HeartbeatData heartbeatData, // - final long timeoutMillis// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); - - request.setBody(heartbeatData.encode()); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public void unregisterClient(// - final String addr, // - final String clientID, // - final String producerGroup, // - final String consumerGroup, // - final long timeoutMillis// - ) throws RemotingException, MQBrokerException, InterruptedException { - final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); - requestHeader.setClientID(clientID); - requestHeader.setProducerGroup(producerGroup); - requestHeader.setConsumerGroup(consumerGroup); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public void endTransactionOneway(// - final String addr, // - final EndTransactionRequestHeader requestHeader, // - final String remark, // - final long timeoutMillis// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); - - request.setRemark(remark); - this.remotingClient.invokeOneway(addr, request, timeoutMillis); - } - - - public void queryMessage( - final String addr, - final QueryMessageRequestHeader requestHeader, - final long timeoutMillis, - final InvokeCallback invokeCallback, - final Boolean isUnqiueKey - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader); - request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString()); - this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis, - invokeCallback); - } - - - public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis) - throws RemotingException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); - - request.setBody(heartbeat.encode()); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - return response.getCode() == ResponseCode.SUCCESS; - } - - - public void consumerSendMessageBack( - final String addr, - final MessageExt msg, - final String consumerGroup, - final int delayLevel, - final long timeoutMillis, - final int maxConsumeRetryTimes - ) throws RemotingException, MQBrokerException, InterruptedException { - ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); - - requestHeader.setGroup(consumerGroup); - requestHeader.setOriginTopic(msg.getTopic()); - requestHeader.setOffset(msg.getCommitLogOffset()); - requestHeader.setDelayLevel(delayLevel); - requestHeader.setOriginMsgId(msg.getMsgId()); - requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public Set<MessageQueue> lockBatchMQ(// - final String addr, // - final LockBatchRequestBody requestBody, // - final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); - - request.setBody(requestBody.encode()); - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class); - Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); - return messageQueues; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public void unlockBatchMQ(// - final String addr, // - final UnlockBatchRequestBody requestBody, // - final long timeoutMillis, // - final boolean oneway// - ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); - - request.setBody(requestBody.encode()); - - if (oneway) { - this.remotingClient.invokeOneway(addr, request, timeoutMillis); - } else { - RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - } - - - public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader(); - requestHeader.setTopic(topic); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class); - return topicStatsTable; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) - throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, - MQBrokerException { - return getConsumeStats(addr, consumerGroup, null, timeoutMillis); - } - - - public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis) - throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, - MQBrokerException { - GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setTopic(topic); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class); - return consumeStats; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { - GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader(); - requestHeader.setProducerGroup(producerGroup); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return ProducerConnection.decode(response.getBody(), ProducerConnection.class); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { - GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); - return consumerConnection; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return KVTable.decode(response.getBody(), KVTable.class); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException, UnsupportedEncodingException { - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null); - - String str = MixAll.properties2String(properties); - if (str != null && str.length() > 0) { - request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET)); - RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - } - - - public Properties getBrokerConfig(final String addr, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException, UnsupportedEncodingException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null); - - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return MixAll.string2Properties(new String(response.getBody(), MixAll.DEFAULT_CHARSET)); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class); - return responseBody; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); - requestHeader.setTopic(topic); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.TOPIC_NOT_EXIST: { - // TODO LOG - break; - } - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - return TopicRouteData.decode(body, TopicRouteData.class); - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); - requestHeader.setTopic(topic); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.TOPIC_NOT_EXIST: { - if (!topic.equals(MixAll.DEFAULT_TOPIC)) - log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); - break; - } - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - return TopicRouteData.decode(body, TopicRouteData.class); - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public TopicList getTopicListFromNameServer(final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(body, TopicList.class); - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException, - RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { - WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader(); - requestHeader.setBrokerName(brokerName); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - WipeWritePermOfBrokerResponseHeader responseHeader = - (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class); - return responseHeader.getWipeTopicCount(); - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); - requestHeader.setTopic(topic); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); - requestHeader.setTopic(topic); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader(); - requestHeader.setGroupName(groupName); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader(); - requestHeader.setNamespace(namespace); - requestHeader.setKey(key); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - GetKVConfigResponseHeader responseHeader = - (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class); - return responseHeader.getValue(); - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader(); - requestHeader.setNamespace(namespace); - requestHeader.setKey(key); - requestHeader.setValue(value); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG, requestHeader); - - List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); - if (nameServerAddressList != null) { - RemotingCommand errResponse = null; - for (String namesrvAddr : nameServerAddressList) { - RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - break; - } - default: - errResponse = response; - } - } - - if (errResponse != null) { - throw new MQClientException(errResponse.getCode(), errResponse.getRemark()); - } - } - } - - - public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader(); - requestHeader.setNamespace(namespace); - requestHeader.setKey(key); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, requestHeader); - - List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); - if (nameServerAddressList != null) { - RemotingCommand errResponse = null; - for (String namesrvAddr : nameServerAddressList) { - RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - break; - } - default: - errResponse = response; - } - } - if (errResponse != null) { - throw new MQClientException(errResponse.getCode(), errResponse.getRemark()); - } - } - } - - - public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader(); - requestHeader.setNamespace(namespace); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KVLIST_BY_NAMESPACE, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return KVTable.decode(response.getBody(), KVTable.class); - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, - final long timestamp, final boolean isForce, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false); - } - - - public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, - final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) - throws RemotingException, MQClientException, InterruptedException { - ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setGroup(group); - requestHeader.setTimestamp(timestamp); - requestHeader.setForce(isForce); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); - if (isC) { - request.setLanguage(LanguageCode.CPP); - } - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - if (response.getBody() != null) { - ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class); - return body.getOffsetTable(); - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, - final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { - GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setGroup(group); - requestHeader.setClientAddr(clientAddr); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - if (response.getBody() != null) { - GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class); - return body.getConsumerTable(); - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { - QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader(); - requestHeader.setTopic(topic); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - GroupList groupList = GroupList.decode(response.getBody(), GroupList.class); - return groupList; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { - QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setGroup(group); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class); - return consumeTimeSpanBody.getConsumeTimeSpanSet(); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader(); - requestHeader.setCluster(cluster); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(body, TopicList.class); - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public void registerMessageFilterClass(final String addr, // - final String consumerGroup, // - final String topic, // - final String className, // - final int classCRC, // - final byte[] classBody, // - final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException, MQBrokerException { - RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setClassName(className); - requestHeader.setTopic(topic); - requestHeader.setClassCRC(classCRC); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader); - request.setBody(classBody); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - - public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); - if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty() - && !UtilAll.isBlank(topicList.getBrokerAddr())) { - TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis); - if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) { - topicList.getTopicList().addAll(tmp.getTopicList()); - } - } - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(body, TopicList.class); - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null); - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return true; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null); - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return true; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack, - final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { - GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setClientId(clientId); - requestHeader.setJstackEnable(jstack); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - ConsumerRunningInfo info = ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class); - return info; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, // - String consumerGroup, // - String clientId, // - String msgId, // - final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { - ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setClientId(clientId); - requestHeader.setMsgId(msgId); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class); - return info; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup, - long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException { - QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader(); - requestHeader.setCompareGroup(group); - requestHeader.setTopic(topic); - if (filterGroup != null) { - StringBuilder sb = new StringBuilder(); - String splitor = ""; - for (String s : filterGroup) { - sb.append(splitor).append(s); - splitor = ","; - } - requestHeader.setFilterGroups(sb.toString()); - } - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - if (response.getBody() != null) { - QueryCorrectionOffsetBody body = QueryCorrectionOffsetBody.decode(response.getBody(), QueryCorrectionOffsetBody.class); - return body.getCorrectionOffsets(); - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); - if (!containRetry) { - Iterator<String> it = topicList.getTopicList().iterator(); - while (it.hasNext()) { - String topic = it.next(); - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) - it.remove(); - } - } - - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); - if (!containRetry) { - Iterator<String> it = topicList.getTopicList().iterator(); - while (it.hasNext()) { - String topic = it.next(); - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) - it.remove(); - } - } - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null); - - RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); - if (!containRetry) { - Iterator<String> it = topicList.getTopicList().iterator(); - while (it.hasNext()) { - String topic = it.next(); - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) - it.remove(); - } - } - return topicList; - } - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic, - final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { - CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader(); - requestHeader.setSrcGroup(srcGroup); - requestHeader.setDestGroup(destGroup); - requestHeader.setTopic(topic); - requestHeader.setOffline(isOffline); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQClientException(response.getCode(), response.getRemark()); - } - - - public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis) - throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException
<TRUNCATED>
