http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java new file mode 100644 index 0000000..62af958 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -0,0 +1,1080 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.producer; + +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.common.ClientErrorCode; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.CheckForbiddenContext; +import org.apache.rocketmq.client.hook.CheckForbiddenHook; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.client.producer.*; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.*; + + +/** + * @author shijia.wxr + */ +public class DefaultMQProducerImpl implements MQProducerInner { + private final Logger log = ClientLogger.getLog(); + private final Random random = new Random(); + private final DefaultMQProducer defaultMQProducer; + private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = + new ConcurrentHashMap<String, TopicPublishInfo>(); + private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); + private final RPCHook rpcHook; + protected BlockingQueue<Runnable> checkRequestQueue; + protected ExecutorService checkExecutor; + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mQClientFactory; + private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); + private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); + + private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + + + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { + this(defaultMQProducer, null); + } + + + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { + this.defaultMQProducer = defaultMQProducer; + this.rpcHook = rpcHook; + } + + public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { + this.checkForbiddenHookList.add(checkForbiddenHook); + log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), + checkForbiddenHookList.size()); + } + + public void initTransactionEnv() { + TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; + this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); + this.checkExecutor = new ThreadPoolExecutor(// + producer.getCheckThreadPoolMinSize(), // + producer.getCheckThreadPoolMaxSize(), // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.checkRequestQueue); + } + + public void destroyTransactionEnv() { + this.checkExecutor.shutdown(); + this.checkRequestQueue.clear(); + } + + public void registerSendMessageHook(final SendMessageHook hook) { + this.sendMessageHookList.add(hook); + log.info("register sendMessage Hook, {}", hook.hookName()); + } + + public void start() throws MQClientException { + this.start(true); + } + + public void start(final boolean startFactory) throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { + this.defaultMQProducer.changeInstanceNameToPID(); + } + + this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); + + boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + + this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); + + if (startFactory) { + mQClientFactory.start(); + } + + log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), + this.defaultMQProducer.isSendMessageWithVIPChannel()); + this.serviceState = ServiceState.RUNNING; + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The producer service state not OK, maybe started once, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + + private void checkConfig() throws MQClientException { + Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); + + if (null == this.defaultMQProducer.getProducerGroup()) { + throw new MQClientException("producerGroup is null", null); + } + + if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { + throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", + null); + } + } + + public void shutdown() { + this.shutdown(true); + } + + public void shutdown(final boolean shutdownFactory) { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); + if (shutdownFactory) { + this.mQClientFactory.shutdown(); + } + + log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } + + @Override + public Set<String> getPublishTopicList() { + Set<String> topicList = new HashSet<String>(); + for (String key : this.topicPublishInfoTable.keySet()) { + topicList.add(key); + } + + return topicList; + } + + @Override + public boolean isPublishTopicNeedUpdate(String topic) { + TopicPublishInfo prev = this.topicPublishInfoTable.get(topic); + + return null == prev || !prev.ok(); + } + + @Override + public TransactionCheckListener checkListener() { + if (this.defaultMQProducer instanceof TransactionMQProducer) { + TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; + return producer.getTransactionCheckListener(); + } + + return null; + } + + @Override + public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { + Runnable request = new Runnable() { + private final String brokerAddr = addr; + private final MessageExt message = msg; + private final CheckTransactionStateRequestHeader checkRequestHeader = header; + private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); + + + @Override + public void run() { + TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); + if (transactionCheckListener != null) { + LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; + Throwable exception = null; + try { + localTransactionState = transactionCheckListener.checkLocalTransactionState(message); + } catch (Throwable e) { + log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); + exception = e; + } + + this.processTransactionState(// + localTransactionState, // + group, // + exception); + } else { + log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); + } + } + + + private void processTransactionState(// + final LocalTransactionState localTransactionState, // + final String producerGroup, // + final Throwable exception) { + final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); + thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); + thisHeader.setProducerGroup(producerGroup); + thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); + thisHeader.setFromTransactionCheck(true); + + String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (uniqueKey == null) { + uniqueKey = message.getMsgId(); + } + thisHeader.setMsgId(uniqueKey); + thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); + switch (localTransactionState) { + case COMMIT_MESSAGE: + thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); + break; + case ROLLBACK_MESSAGE: + thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); + log.warn("when broker check, client rollback this transaction, {}", thisHeader); + break; + case UNKNOW: + thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); + log.warn("when broker check, client does not know this transaction state, {}", thisHeader); + break; + default: + break; + } + + String remark = null; + if (exception != null) { + remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); + } + + try { + DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, + 3000); + } catch (Exception e) { + log.error("endTransactionOneway exception", e); + } + } + }; + + this.checkExecutor.submit(request); + } + + @Override + public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) { + if (info != null && topic != null) { + TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info); + if (prev != null) { + log.info("updateTopicPublishInfo prev is not null, " + prev.toString()); + } + } + } + + @Override + public boolean isUnitMode() { + return this.defaultMQProducer.isUnitMode(); + } + + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.makeSureStateOK(); + Validators.checkTopic(newTopic); + + this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + private void makeSureStateOK() throws MQClientException { + if (this.serviceState != ServiceState.RUNNING) { + throw new MQClientException("The producer service state not OK, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + } + } + + public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + public long maxOffset(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } + + public long minOffset(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().minOffset(mq); + } + + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); + } + + public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.makeSureStateOK(); + + return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); + } + + public MessageExt queryMessageByUniqKey(String topic, String uniqKey) + throws MQClientException, InterruptedException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); + } + + /** + * DEFAULT ASYNC ------------------------------------------------------- + */ + public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); + } + + public void send(Message msg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + try { + this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (MQBrokerException e) { + throw new MQClientException("unknownn exception", e); + } + } + + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { + return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); + } + + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { + this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); + } + + private SendResult sendDefaultImpl(// + Message msg, // + final CommunicationMode communicationMode, // + final SendCallback sendCallback, // + final long timeout// + ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + final long invokeID = random.nextLong(); + long beginTimestampFirst = System.currentTimeMillis(); + long beginTimestampPrev = beginTimestampFirst; + long endTimestamp = beginTimestampFirst; + TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); + if (topicPublishInfo != null && topicPublishInfo.ok()) { + MessageQueue mq = null; + Exception exception = null; + SendResult sendResult = null; + int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; + int times = 0; + String[] brokersSent = new String[timesTotal]; + for (; times < timesTotal; times++) { + String lastBrokerName = null == mq ? null : mq.getBrokerName(); + MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + if (tmpmq != null) { + mq = tmpmq; + brokersSent[times] = mq.getBrokerName(); + try { + beginTimestampPrev = System.currentTimeMillis(); + sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); + switch (communicationMode) { + case ASYNC: + return null; + case ONEWAY: + return null; + case SYNC: + if (sendResult.getSendStatus() != SendStatus.SEND_OK) { + if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { + continue; + } + } + + return sendResult; + default: + break; + } + } catch (RemotingException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + continue; + } catch (MQClientException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + continue; + } catch (MQBrokerException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + exception = e; + switch (e.getResponseCode()) { + case ResponseCode.TOPIC_NOT_EXIST: + case ResponseCode.SERVICE_NOT_AVAILABLE: + case ResponseCode.SYSTEM_ERROR: + case ResponseCode.NO_PERMISSION: + case ResponseCode.NO_BUYER_ID: + case ResponseCode.NOT_IN_CURRENT_UNIT: + continue; + default: + if (sendResult != null) { + return sendResult; + } + + throw e; + } + } catch (InterruptedException e) { + endTimestamp = System.currentTimeMillis(); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); + log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); + log.warn(msg.toString()); + + log.warn("sendKernelImpl exception", e); + log.warn(msg.toString()); + throw e; + } + } else { + break; + } + } + + if (sendResult != null) { + return sendResult; + } + + String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", + times, + System.currentTimeMillis() - beginTimestampFirst, + msg.getTopic(), + Arrays.toString(brokersSent)); + + info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); + + MQClientException mqClientException = new MQClientException(info, exception); + if (exception instanceof MQBrokerException) { + mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); + } else if (exception instanceof RemotingConnectException) { + mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); + } else if (exception instanceof RemotingTimeoutException) { + mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); + } else if (exception instanceof MQClientException) { + mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); + } + + throw mqClientException; + } + + List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); + if (null == nsList || nsList.isEmpty()) { + throw new MQClientException( + "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); + } + + throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), + null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); + } + + private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { + TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + topicPublishInfo = this.topicPublishInfoTable.get(topic); + } + + if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { + return topicPublishInfo; + } else { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); + topicPublishInfo = this.topicPublishInfoTable.get(topic); + return topicPublishInfo; + } + } + + private SendResult sendKernelImpl(final Message msg, // + final MessageQueue mq, // + final CommunicationMode communicationMode, // + final SendCallback sendCallback, // + final TopicPublishInfo topicPublishInfo, // + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); + if (null == brokerAddr) { + tryToFindTopicPublishInfo(mq.getTopic()); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); + } + + SendMessageContext context = null; + if (brokerAddr != null) { + brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); + + byte[] prevBody = msg.getBody(); + try { + + MessageClientIDSetter.setUniqID(msg); + + int sysFlag = 0; + if (this.tryToCompressMessage(msg)) { + sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + } + + final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { + sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; + } + + if (hasCheckForbiddenHook()) { + CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); + checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); + checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); + checkForbiddenContext.setCommunicationMode(communicationMode); + checkForbiddenContext.setBrokerAddr(brokerAddr); + checkForbiddenContext.setMessage(msg); + checkForbiddenContext.setMq(mq); + checkForbiddenContext.setUnitMode(this.isUnitMode()); + this.executeCheckForbiddenHook(checkForbiddenContext); + } + + if (this.hasSendMessageHook()) { + context = new SendMessageContext(); + context.setProducer(this); + context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + context.setCommunicationMode(communicationMode); + context.setBornHost(this.defaultMQProducer.getClientIP()); + context.setBrokerAddr(brokerAddr); + context.setMessage(msg); + context.setMq(mq); + String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (isTrans != null && isTrans.equals("true")) { + context.setMsgType(MessageType.Trans_Msg_Half); + } + + if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { + context.setMsgType(MessageType.Delay_Msg); + } + this.executeSendMessageHookBefore(context); + } + + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + requestHeader.setTopic(msg.getTopic()); + requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); + requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setSysFlag(sysFlag); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(msg.getFlag()); + requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); + requestHeader.setReconsumeTimes(0); + requestHeader.setUnitMode(this.isUnitMode()); + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); + if (reconsumeTimes != null) { + requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); + } + + String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); + if (maxReconsumeTimes != null) { + requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); + } + } + + SendResult sendResult = null; + switch (communicationMode) { + case ASYNC: + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// + brokerAddr, // 1 + mq.getBrokerName(), // 2 + msg, // 3 + requestHeader, // 4 + timeout, // 5 + communicationMode, // 6 + sendCallback, // 7 + topicPublishInfo, // 8 + this.mQClientFactory, // 9 + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 + context, // + this); + break; + case ONEWAY: + case SYNC: + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout, + communicationMode, + context, + this); + break; + default: + assert false; + break; + } + + if (this.hasSendMessageHook()) { + context.setSendResult(sendResult); + this.executeSendMessageHookAfter(context); + } + + return sendResult; + } catch (RemotingException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } catch (MQBrokerException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } catch (InterruptedException e) { + if (this.hasSendMessageHook()) { + context.setException(e); + this.executeSendMessageHookAfter(context); + } + throw e; + } finally { + msg.setBody(prevBody); + } + } + + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + + public MQClientInstance getmQClientFactory() { + return mQClientFactory; + } + + private boolean tryToCompressMessage(final Message msg) { + byte[] body = msg.getBody(); + if (body != null) { + if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { + try { + byte[] data = UtilAll.compress(body, zipCompressLevel); + if (data != null) { + msg.setBody(data); + return true; + } + } catch (IOException e) { + log.error("tryToCompressMessage exception", e); + log.warn(msg.toString()); + } + } + } + + return false; + } + + public boolean hasCheckForbiddenHook() { + return !checkForbiddenHookList.isEmpty(); + } + + public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException { + if (hasCheckForbiddenHook()) { + for (CheckForbiddenHook hook : checkForbiddenHookList) { + hook.checkForbidden(context); + } + } + } + + public boolean hasSendMessageHook() { + return !this.sendMessageHookList.isEmpty(); + } + + public void executeSendMessageHookBefore(final SendMessageContext context) { + if (!this.sendMessageHookList.isEmpty()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + hook.sendMessageBefore(context); + } catch (Throwable e) { + log.warn("failed to executeSendMessageHookBefore", e); + } + } + } + } + + public void executeSendMessageHookAfter(final SendMessageContext context) { + if (!this.sendMessageHookList.isEmpty()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + hook.sendMessageAfter(context); + } catch (Throwable e) { + log.warn("failed to executeSendMessageHookAfter", e); + } + } + } + } + + /** + * DEFAULT ONEWAY ------------------------------------------------------- + */ + public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { + try { + this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + /** + * KERNEL SYNC ------------------------------------------------------- + */ + public SendResult send(Message msg, MessageQueue mq) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout()); + } + + public SendResult send(Message msg, MessageQueue mq, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + + return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout); + } + + /** + * KERNEL ASYNC ------------------------------------------------------- + */ + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); + } + + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + + try { + this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + /** + * KERNEL ONEWAY ------------------------------------------------------- + */ + public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + try { + this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout()); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + /** + * SELECT SYNC ------------------------------------------------------- + */ + public SendResult send(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); + } + + public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); + } + + private SendResult sendSelectImpl(// + Message msg, // + MessageQueueSelector selector, // + Object arg, // + final CommunicationMode communicationMode, // + final SendCallback sendCallback, final long timeout// + ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); + if (topicPublishInfo != null && topicPublishInfo.ok()) { + MessageQueue mq = null; + try { + mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); + } catch (Throwable e) { + throw new MQClientException("select message queue throwed exception.", e); + } + + if (mq != null) { + return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout); + } else { + throw new MQClientException("select message queue return null.", null); + } + } + + throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); + } + + /** + * SELECT ASYNC ------------------------------------------------------- + */ + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); + } + + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + try { + this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (MQBrokerException e) { + throw new MQClientException("unknownn exception", e); + } + } + + /** + * SELECT ONEWAY ------------------------------------------------------- + */ + public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, InterruptedException { + try { + this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } + + public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) + throws MQClientException { + if (null == tranExecuter) { + throw new MQClientException("tranExecutor is null", null); + } + Validators.checkMessage(msg, this.defaultMQProducer); + + SendResult sendResult = null; + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); + try { + sendResult = this.send(msg); + } catch (Exception e) { + throw new MQClientException("send message Exception", e); + } + + LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; + Throwable localException = null; + switch (sendResult.getSendStatus()) { + case SEND_OK: { + try { + if (sendResult.getTransactionId() != null) { + msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); + } + localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); + if (null == localTransactionState) { + localTransactionState = LocalTransactionState.UNKNOW; + } + + if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { + log.info("executeLocalTransactionBranch return {}", localTransactionState); + log.info(msg.toString()); + } + } catch (Throwable e) { + log.info("executeLocalTransactionBranch exception", e); + log.info(msg.toString()); + localException = e; + } + } + break; + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case SLAVE_NOT_AVAILABLE: + localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; + break; + default: + break; + } + + try { + this.endTransaction(sendResult, localTransactionState, localException); + } catch (Exception e) { + log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); + } + + TransactionSendResult transactionSendResult = new TransactionSendResult(); + transactionSendResult.setSendStatus(sendResult.getSendStatus()); + transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); + transactionSendResult.setMsgId(sendResult.getMsgId()); + transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); + transactionSendResult.setTransactionId(sendResult.getTransactionId()); + transactionSendResult.setLocalTransactionState(localTransactionState); + return transactionSendResult; + } + + /** + * DEFAULT SYNC ------------------------------------------------------- + */ + public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return send(msg, this.defaultMQProducer.getSendMsgTimeout()); + } + + public void endTransaction(// + final SendResult sendResult, // + final LocalTransactionState localTransactionState, // + final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { + final MessageId id; + if (sendResult.getOffsetMsgId() != null) { + id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); + } else { + id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); + } + String transactionId = sendResult.getTransactionId(); + final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); + EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); + requestHeader.setTransactionId(transactionId); + requestHeader.setCommitLogOffset(id.getOffset()); + switch (localTransactionState) { + case COMMIT_MESSAGE: + requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); + break; + case ROLLBACK_MESSAGE: + requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); + break; + case UNKNOW: + requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); + break; + default: + break; + } + + requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); + requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); + requestHeader.setMsgId(sendResult.getMsgId()); + String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; + this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, + this.defaultMQProducer.getSendMsgTimeout()); + } + + public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); + } + + public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() { + return topicPublishInfoTable; + } + + public int getZipCompressLevel() { + return zipCompressLevel; + } + + + public void setZipCompressLevel(int zipCompressLevel) { + this.zipCompressLevel = zipCompressLevel; + } + + + public ServiceState getServiceState() { + return serviceState; + } + + + public void setServiceState(ServiceState serviceState) { + this.serviceState = serviceState; + } + + public long[] getNotAvailableDuration() { + return this.mqFaultStrategy.getNotAvailableDuration(); + } + + public void setNotAvailableDuration(final long[] notAvailableDuration) { + this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration); + } + + public long[] getLatencyMax() { + return this.mqFaultStrategy.getLatencyMax(); + } + + public void setLatencyMax(final long[] latencyMax) { + this.mqFaultStrategy.setLatencyMax(latencyMax); + } + + public boolean isSendLatencyFaultEnable() { + return this.mqFaultStrategy.isSendLatencyFaultEnable(); + } + + public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { + this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java new file mode 100644 index 0000000..c196a43 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.producer; + +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; + +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public interface MQProducerInner { + Set<String> getPublishTopicList(); + + + boolean isPublishTopicNeedUpdate(final String topic); + + + TransactionCheckListener checkListener(); + + + void checkTransactionState(// + final String addr, // + final MessageExt msg, // + final CheckTransactionStateRequestHeader checkRequestHeader); + + + void updateTopicPublishInfo(final String topic, final TopicPublishInfo info); + + + boolean isUnitMode(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java new file mode 100644 index 0000000..5267625 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.producer; + +import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class TopicPublishInfo { + private boolean orderTopic = false; + private boolean haveTopicRouterInfo = false; + private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); + private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); + private TopicRouteData topicRouteData; + + + public boolean isOrderTopic() { + return orderTopic; + } + + public void setOrderTopic(boolean orderTopic) { + this.orderTopic = orderTopic; + } + + public boolean ok() { + return null != this.messageQueueList && !this.messageQueueList.isEmpty(); + } + + public List<MessageQueue> getMessageQueueList() { + return messageQueueList; + } + + + public void setMessageQueueList(List<MessageQueue> messageQueueList) { + this.messageQueueList = messageQueueList; + } + + + public ThreadLocalIndex getSendWhichQueue() { + return sendWhichQueue; + } + + + public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) { + this.sendWhichQueue = sendWhichQueue; + } + + + public boolean isHaveTopicRouterInfo() { + return haveTopicRouterInfo; + } + + + public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) { + this.haveTopicRouterInfo = haveTopicRouterInfo; + } + + + public MessageQueue selectOneMessageQueue(final String lastBrokerName) { + if (lastBrokerName == null) { + return selectOneMessageQueue(); + } else { + int index = this.sendWhichQueue.getAndIncrement(); + for (int i = 0; i < this.messageQueueList.size(); i++) { + int pos = Math.abs(index++) % this.messageQueueList.size(); + if (pos < 0) + pos = 0; + MessageQueue mq = this.messageQueueList.get(pos); + if (!mq.getBrokerName().equals(lastBrokerName)) { + return mq; + } + } + return selectOneMessageQueue(); + } + } + + + public MessageQueue selectOneMessageQueue() { + int index = this.sendWhichQueue.getAndIncrement(); + int pos = Math.abs(index) % this.messageQueueList.size(); + if (pos < 0) + pos = 0; + return this.messageQueueList.get(pos); + } + + public int getQueueIdByBroker(final String brokerName) { + for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { + final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); + if (queueData.getBrokerName().equals(brokerName)) { + return queueData.getWriteQueueNums(); + } + } + + return -1; + } + + + @Override + public String toString() { + return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList + + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; + } + + public TopicRouteData getTopicRouteData() { + return topicRouteData; + } + + public void setTopicRouteData(final TopicRouteData topicRouteData) { + this.topicRouteData = topicRouteData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java new file mode 100644 index 0000000..c5e25ce --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.latency; + +/** + * @author shijia.wxr + */ +public interface LatencyFaultTolerance<T> { + void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); + + boolean isAvailable(final T name); + + void remove(final T name); + + T pickOneAtLeast(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java new file mode 100644 index 0000000..3bd7788 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.latency; + +import org.apache.rocketmq.client.common.ThreadLocalIndex; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author shijia.wxr + */ +public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { + private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); + + private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0); + + @Override + public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { + FaultItem old = this.faultItemTable.get(name); + if (null == old) { + final FaultItem faultItem = new FaultItem(name); + faultItem.setCurrentLatency(currentLatency); + faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + + old = this.faultItemTable.putIfAbsent(name, faultItem); + if (old != null) { + old.setCurrentLatency(currentLatency); + old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + } + } else { + old.setCurrentLatency(currentLatency); + old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + } + } + + @Override + public boolean isAvailable(final String name) { + final FaultItem faultItem = this.faultItemTable.get(name); + if (faultItem != null) { + return faultItem.isAvailable(); + } + return true; + } + + @Override + public void remove(final String name) { + this.faultItemTable.remove(name); + } + + @Override + public String pickOneAtLeast() { + final Enumeration<FaultItem> elements = this.faultItemTable.elements(); + List<FaultItem> tmpList = new LinkedList<FaultItem>(); + while (elements.hasMoreElements()) { + final FaultItem faultItem = elements.nextElement(); + tmpList.add(faultItem); + } + + if (!tmpList.isEmpty()) { + Collections.shuffle(tmpList); + + Collections.sort(tmpList); + + final int half = tmpList.size() / 2; + if (half <= 0) { + return tmpList.get(0).getName(); + } else { + final int i = this.whichItemWorst.getAndIncrement() % half; + return tmpList.get(i).getName(); + } + } + + return null; + } + + class FaultItem implements Comparable<FaultItem> { + private final String name; + private volatile long currentLatency; + private volatile long startTimestamp; + + public FaultItem(final String name) { + this.name = name; + } + + @Override + public int compareTo(final FaultItem other) { + if (this.isAvailable() != other.isAvailable()) { + if (this.isAvailable()) return -1; + + if (other.isAvailable()) return 1; + } + + if (this.currentLatency < other.currentLatency) + return -1; + else if (this.currentLatency > other.currentLatency) { + return 1; + } + + if (this.startTimestamp < other.startTimestamp) + return -1; + else if (this.startTimestamp > other.startTimestamp) { + return 1; + } + + return 0; + } + + public boolean isAvailable() { + return (System.currentTimeMillis() - startTimestamp) >= 0; + } + + @Override + public int hashCode() { + int result = getName() != null ? getName().hashCode() : 0; + result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); + result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); + return result; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof FaultItem)) return false; + + final FaultItem faultItem = (FaultItem) o; + + if (getCurrentLatency() != faultItem.getCurrentLatency()) return false; + if (getStartTimestamp() != faultItem.getStartTimestamp()) return false; + return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; + + } + + @Override + public String toString() { + return "FaultItem{" + + "name='" + name + '\'' + + ", currentLatency=" + currentLatency + + ", startTimestamp=" + startTimestamp + + '}'; + } + + public String getName() { + return name; + } + + public long getCurrentLatency() { + return currentLatency; + } + + public void setCurrentLatency(final long currentLatency) { + this.currentLatency = currentLatency; + } + + public long getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(final long startTimestamp) { + this.startTimestamp = startTimestamp; + } + + + } + + @Override + public String toString() { + return "LatencyFaultToleranceImpl{" + + "faultItemTable=" + faultItemTable + + ", whichItemWorst=" + whichItemWorst + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java new file mode 100644 index 0000000..6d32105 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.latency; + +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * @author shijia.wxr + */ +public class MQFaultStrategy { + private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); + + private boolean sendLatencyFaultEnable = false; + + private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; + private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; + + public long[] getNotAvailableDuration() { + return notAvailableDuration; + } + + public void setNotAvailableDuration(final long[] notAvailableDuration) { + this.notAvailableDuration = notAvailableDuration; + } + + public long[] getLatencyMax() { + return latencyMax; + } + + public void setLatencyMax(final long[] latencyMax) { + this.latencyMax = latencyMax; + } + + public boolean isSendLatencyFaultEnable() { + return sendLatencyFaultEnable; + } + + public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { + this.sendLatencyFaultEnable = sendLatencyFaultEnable; + } + + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { + if (this.sendLatencyFaultEnable) { + try { + int index = tpInfo.getSendWhichQueue().getAndIncrement(); + for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { + int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); + if (pos < 0) + pos = 0; + MessageQueue mq = tpInfo.getMessageQueueList().get(pos); + if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { + if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) + return mq; + } + } + + final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); + int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); + if (writeQueueNums > 0) { + final MessageQueue mq = tpInfo.selectOneMessageQueue(); + if (notBestBroker != null) { + mq.setBrokerName(notBestBroker); + mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); + } + return mq; + } else { + latencyFaultTolerance.remove(notBestBroker); + } + } catch (Exception e) { + } + + return tpInfo.selectOneMessageQueue(); + } + + return tpInfo.selectOneMessageQueue(lastBrokerName); + } + + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { + if (this.sendLatencyFaultEnable) { + long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); + this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); + } + } + + private long computeNotAvailableDuration(final long currentLatency) { + for (int i = latencyMax.length - 1; i >= 0; i--) { + if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; + } + + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java new file mode 100644 index 0000000..e4c5525 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.log; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.ILoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.net.URL; + + +/** + * @author shijia.wxr + */ +public class ClientLogger { + private static Logger log; + public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot"; + public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"; + public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel"; + + static { + log = createLogger(LoggerName.CLIENT_LOGGER_NAME); + } + + + private static Logger createLogger(final String loggerName) { + String logConfigFilePath = + System.getProperty("rocketmq.client.log.configFile", + System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE")); + Boolean isloadconfig = + Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true")); + + final String log4JResourceFile = + System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml"); + + final String logbackResourceFile = + System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml"); + + String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs"); + System.setProperty("client.logRoot", clientLogRoot); + String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO"); + System.setProperty("client.logLevel", clientLogLevel); + String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10"); + System.setProperty("client.logFileMaxIndex", clientLogMaxIndex); + + if (isloadconfig) { + try { + ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory(); + Class classType = iLoggerFactory.getClass(); + if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) { + Class<?> domconfigurator; + Object domconfiguratorobj; + domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator"); + domconfiguratorobj = domconfigurator.newInstance(); + if (null == logConfigFilePath) { + Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class); + URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile); + configure.invoke(domconfiguratorobj, url); + } else { + Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class); + configure.invoke(domconfiguratorobj, logConfigFilePath); + } + + } else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) { + Class<?> joranConfigurator; + Class<?> context = Class.forName("ch.qos.logback.core.Context"); + Object joranConfiguratoroObj; + joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator"); + joranConfiguratoroObj = joranConfigurator.newInstance(); + Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context); + setContext.invoke(joranConfiguratoroObj, iLoggerFactory); + if (null == logConfigFilePath) { + URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile); + Method doConfigure = + joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class); + doConfigure.invoke(joranConfiguratoroObj, url); + } else { + Method doConfigure = + joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class); + doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath); + } + + } + } catch (Exception e) { + System.err.println(e); + } + } + return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); + } + + + public static Logger getLog() { + return log; + } + + + public static void setLog(Logger log) { + ClientLogger.log = log; + } +}