http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java deleted file mode 100644 index b82cde9..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ /dev/null @@ -1,1080 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.producer; - -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.Validators; -import com.alibaba.rocketmq.client.common.ClientErrorCode; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.hook.CheckForbiddenContext; -import com.alibaba.rocketmq.client.hook.CheckForbiddenHook; -import com.alibaba.rocketmq.client.hook.SendMessageContext; -import com.alibaba.rocketmq.client.hook.SendMessageHook; -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.impl.MQClientManager; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.latency.MQFaultStrategy; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.producer.*; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ServiceState; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; -import org.slf4j.Logger; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; - - -/** - * @author shijia.wxr - */ -public class DefaultMQProducerImpl implements MQProducerInner { - private final Logger log = ClientLogger.getLog(); - private final Random random = new Random(); - private final DefaultMQProducer defaultMQProducer; - private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = - new ConcurrentHashMap<String, TopicPublishInfo>(); - private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); - private final RPCHook rpcHook; - protected BlockingQueue<Runnable> checkRequestQueue; - protected ExecutorService checkExecutor; - private ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mQClientFactory; - private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); - private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); - - private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); - - - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { - this(defaultMQProducer, null); - } - - - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { - this.defaultMQProducer = defaultMQProducer; - this.rpcHook = rpcHook; - } - - public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { - this.checkForbiddenHookList.add(checkForbiddenHook); - log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), - checkForbiddenHookList.size()); - } - - public void initTransactionEnv() { - TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; - this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); - this.checkExecutor = new ThreadPoolExecutor(// - producer.getCheckThreadPoolMinSize(), // - producer.getCheckThreadPoolMaxSize(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.checkRequestQueue); - } - - public void destroyTransactionEnv() { - this.checkExecutor.shutdown(); - this.checkRequestQueue.clear(); - } - - public void registerSendMessageHook(final SendMessageHook hook) { - this.sendMessageHookList.add(hook); - log.info("register sendMessage Hook, {}", hook.hookName()); - } - - public void start() throws MQClientException { - this.start(true); - } - - public void start(final boolean startFactory) throws MQClientException { - switch (this.serviceState) { - case CREATE_JUST: - this.serviceState = ServiceState.START_FAILED; - - this.checkConfig(); - - if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { - this.defaultMQProducer.changeInstanceNameToPID(); - } - - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); - - boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); - if (!registerOK) { - this.serviceState = ServiceState.CREATE_JUST; - throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); - } - - this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); - - if (startFactory) { - mQClientFactory.start(); - } - - log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), - this.defaultMQProducer.isSendMessageWithVIPChannel()); - this.serviceState = ServiceState.RUNNING; - break; - case RUNNING: - case START_FAILED: - case SHUTDOWN_ALREADY: - throw new MQClientException("The producer service state not OK, maybe started once, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - default: - break; - } - - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - } - - private void checkConfig() throws MQClientException { - Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); - - if (null == this.defaultMQProducer.getProducerGroup()) { - throw new MQClientException("producerGroup is null", null); - } - - if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { - throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", - null); - } - } - - public void shutdown() { - this.shutdown(true); - } - - public void shutdown(final boolean shutdownFactory) { - switch (this.serviceState) { - case CREATE_JUST: - break; - case RUNNING: - this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); - if (shutdownFactory) { - this.mQClientFactory.shutdown(); - } - - log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); - this.serviceState = ServiceState.SHUTDOWN_ALREADY; - break; - case SHUTDOWN_ALREADY: - break; - default: - break; - } - } - - @Override - public Set<String> getPublishTopicList() { - Set<String> topicList = new HashSet<String>(); - for (String key : this.topicPublishInfoTable.keySet()) { - topicList.add(key); - } - - return topicList; - } - - @Override - public boolean isPublishTopicNeedUpdate(String topic) { - TopicPublishInfo prev = this.topicPublishInfoTable.get(topic); - - return null == prev || !prev.ok(); - } - - @Override - public TransactionCheckListener checkListener() { - if (this.defaultMQProducer instanceof TransactionMQProducer) { - TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; - return producer.getTransactionCheckListener(); - } - - return null; - } - - @Override - public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { - Runnable request = new Runnable() { - private final String brokerAddr = addr; - private final MessageExt message = msg; - private final CheckTransactionStateRequestHeader checkRequestHeader = header; - private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); - - - @Override - public void run() { - TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); - if (transactionCheckListener != null) { - LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; - Throwable exception = null; - try { - localTransactionState = transactionCheckListener.checkLocalTransactionState(message); - } catch (Throwable e) { - log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); - exception = e; - } - - this.processTransactionState(// - localTransactionState, // - group, // - exception); - } else { - log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); - } - } - - - private void processTransactionState(// - final LocalTransactionState localTransactionState, // - final String producerGroup, // - final Throwable exception) { - final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); - thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); - thisHeader.setProducerGroup(producerGroup); - thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); - thisHeader.setFromTransactionCheck(true); - - String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); - if (uniqueKey == null) { - uniqueKey = message.getMsgId(); - } - thisHeader.setMsgId(uniqueKey); - thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); - switch (localTransactionState) { - case COMMIT_MESSAGE: - thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); - break; - case ROLLBACK_MESSAGE: - thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); - log.warn("when broker check, client rollback this transaction, {}", thisHeader); - break; - case UNKNOW: - thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); - log.warn("when broker check, client does not know this transaction state, {}", thisHeader); - break; - default: - break; - } - - String remark = null; - if (exception != null) { - remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); - } - - try { - DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, - 3000); - } catch (Exception e) { - log.error("endTransactionOneway exception", e); - } - } - }; - - this.checkExecutor.submit(request); - } - - @Override - public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) { - if (info != null && topic != null) { - TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info); - if (prev != null) { - log.info("updateTopicPublishInfo prev is not null, " + prev.toString()); - } - } - } - - @Override - public boolean isUnitMode() { - return this.defaultMQProducer.isUnitMode(); - } - - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.makeSureStateOK(); - Validators.checkTopic(newTopic); - - this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); - } - - private void makeSureStateOK() throws MQClientException { - if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The producer service state not OK, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - } - } - - public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); - } - - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); - } - - public long maxOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } - - public long minOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().minOffset(mq); - } - - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); - } - - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.makeSureStateOK(); - - return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); - } - - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); - } - - public MessageExt queryMessageByUniqKey(String topic, String uniqKey) - throws MQClientException, InterruptedException { - this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); - } - - /** - * DEFAULT ASYNC ------------------------------------------------------- - */ - public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { - send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); - } - - public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); - } - } - - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { - return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); - } - - public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { - this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); - } - - private SendResult sendDefaultImpl(// - Message msg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final long timeout// - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - final long invokeID = random.nextLong(); - long beginTimestampFirst = System.currentTimeMillis(); - long beginTimestampPrev = beginTimestampFirst; - long endTimestamp = beginTimestampFirst; - TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); - if (topicPublishInfo != null && topicPublishInfo.ok()) { - MessageQueue mq = null; - Exception exception = null; - SendResult sendResult = null; - int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; - int times = 0; - String[] brokersSent = new String[timesTotal]; - for (; times < timesTotal; times++) { - String lastBrokerName = null == mq ? null : mq.getBrokerName(); - MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); - if (tmpmq != null) { - mq = tmpmq; - brokersSent[times] = mq.getBrokerName(); - try { - beginTimestampPrev = System.currentTimeMillis(); - sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); - switch (communicationMode) { - case ASYNC: - return null; - case ONEWAY: - return null; - case SYNC: - if (sendResult.getSendStatus() != SendStatus.SEND_OK) { - if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { - continue; - } - } - - return sendResult; - default: - break; - } - } catch (RemotingException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - continue; - } catch (MQClientException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - continue; - } catch (MQBrokerException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - switch (e.getResponseCode()) { - case ResponseCode.TOPIC_NOT_EXIST: - case ResponseCode.SERVICE_NOT_AVAILABLE: - case ResponseCode.SYSTEM_ERROR: - case ResponseCode.NO_PERMISSION: - case ResponseCode.NO_BUYER_ID: - case ResponseCode.NOT_IN_CURRENT_UNIT: - continue; - default: - if (sendResult != null) { - return sendResult; - } - - throw e; - } - } catch (InterruptedException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); - log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - - log.warn("sendKernelImpl exception", e); - log.warn(msg.toString()); - throw e; - } - } else { - break; - } - } - - if (sendResult != null) { - return sendResult; - } - - String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", - times, - System.currentTimeMillis() - beginTimestampFirst, - msg.getTopic(), - Arrays.toString(brokersSent)); - - info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); - - MQClientException mqClientException = new MQClientException(info, exception); - if (exception instanceof MQBrokerException) { - mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); - } else if (exception instanceof RemotingConnectException) { - mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); - } else if (exception instanceof RemotingTimeoutException) { - mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); - } else if (exception instanceof MQClientException) { - mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); - } - - throw mqClientException; - } - - List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); - if (null == nsList || nsList.isEmpty()) { - throw new MQClientException( - "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); - } - - throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), - null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); - } - - private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { - TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); - if (null == topicPublishInfo || !topicPublishInfo.ok()) { - this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - topicPublishInfo = this.topicPublishInfoTable.get(topic); - } - - if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { - return topicPublishInfo; - } else { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); - topicPublishInfo = this.topicPublishInfoTable.get(topic); - return topicPublishInfo; - } - } - - private SendResult sendKernelImpl(final Message msg, // - final MessageQueue mq, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - tryToFindTopicPublishInfo(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - } - - SendMessageContext context = null; - if (brokerAddr != null) { - brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); - - byte[] prevBody = msg.getBody(); - try { - - MessageClientIDSetter.setUniqID(msg); - - int sysFlag = 0; - if (this.tryToCompressMessage(msg)) { - sysFlag |= MessageSysFlag.COMPRESSED_FLAG; - } - - final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { - sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; - } - - if (hasCheckForbiddenHook()) { - CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); - checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); - checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); - checkForbiddenContext.setCommunicationMode(communicationMode); - checkForbiddenContext.setBrokerAddr(brokerAddr); - checkForbiddenContext.setMessage(msg); - checkForbiddenContext.setMq(mq); - checkForbiddenContext.setUnitMode(this.isUnitMode()); - this.executeCheckForbiddenHook(checkForbiddenContext); - } - - if (this.hasSendMessageHook()) { - context = new SendMessageContext(); - context.setProducer(this); - context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); - context.setCommunicationMode(communicationMode); - context.setBornHost(this.defaultMQProducer.getClientIP()); - context.setBrokerAddr(brokerAddr); - context.setMessage(msg); - context.setMq(mq); - String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - if (isTrans != null && isTrans.equals("true")) { - context.setMsgType(MessageType.Trans_Msg_Half); - } - - if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { - context.setMsgType(MessageType.Delay_Msg); - } - this.executeSendMessageHookBefore(context); - } - - SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); - requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); - requestHeader.setTopic(msg.getTopic()); - requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); - requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); - requestHeader.setQueueId(mq.getQueueId()); - requestHeader.setSysFlag(sysFlag); - requestHeader.setBornTimestamp(System.currentTimeMillis()); - requestHeader.setFlag(msg.getFlag()); - requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); - requestHeader.setReconsumeTimes(0); - requestHeader.setUnitMode(this.isUnitMode()); - if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); - if (reconsumeTimes != null) { - requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); - MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); - } - - String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); - if (maxReconsumeTimes != null) { - requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); - MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); - } - } - - SendResult sendResult = null; - switch (communicationMode) { - case ASYNC: - sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// - brokerAddr, // 1 - mq.getBrokerName(), // 2 - msg, // 3 - requestHeader, // 4 - timeout, // 5 - communicationMode, // 6 - sendCallback, // 7 - topicPublishInfo, // 8 - this.mQClientFactory, // 9 - this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 - context, // - this); - break; - case ONEWAY: - case SYNC: - sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( - brokerAddr, - mq.getBrokerName(), - msg, - requestHeader, - timeout, - communicationMode, - context, - this); - break; - default: - assert false; - break; - } - - if (this.hasSendMessageHook()) { - context.setSendResult(sendResult); - this.executeSendMessageHookAfter(context); - } - - return sendResult; - } catch (RemotingException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } catch (MQBrokerException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } catch (InterruptedException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } finally { - msg.setBody(prevBody); - } - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - public MQClientInstance getmQClientFactory() { - return mQClientFactory; - } - - private boolean tryToCompressMessage(final Message msg) { - byte[] body = msg.getBody(); - if (body != null) { - if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { - try { - byte[] data = UtilAll.compress(body, zipCompressLevel); - if (data != null) { - msg.setBody(data); - return true; - } - } catch (IOException e) { - log.error("tryToCompressMessage exception", e); - log.warn(msg.toString()); - } - } - } - - return false; - } - - public boolean hasCheckForbiddenHook() { - return !checkForbiddenHookList.isEmpty(); - } - - public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException { - if (hasCheckForbiddenHook()) { - for (CheckForbiddenHook hook : checkForbiddenHookList) { - hook.checkForbidden(context); - } - } - } - - public boolean hasSendMessageHook() { - return !this.sendMessageHookList.isEmpty(); - } - - public void executeSendMessageHookBefore(final SendMessageContext context) { - if (!this.sendMessageHookList.isEmpty()) { - for (SendMessageHook hook : this.sendMessageHookList) { - try { - hook.sendMessageBefore(context); - } catch (Throwable e) { - log.warn("failed to executeSendMessageHookBefore", e); - } - } - } - } - - public void executeSendMessageHookAfter(final SendMessageContext context) { - if (!this.sendMessageHookList.isEmpty()) { - for (SendMessageHook hook : this.sendMessageHookList) { - try { - hook.sendMessageAfter(context); - } catch (Throwable e) { - log.warn("failed to executeSendMessageHookAfter", e); - } - } - } - } - - /** - * DEFAULT ONEWAY ------------------------------------------------------- - */ - public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { - try { - this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - /** - * KERNEL SYNC ------------------------------------------------------- - */ - public SendResult send(Message msg, MessageQueue mq) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout()); - } - - public SendResult send(Message msg, MessageQueue mq, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } - - return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout); - } - - /** - * KERNEL ASYNC ------------------------------------------------------- - */ - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { - send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); - } - - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } - - try { - this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - /** - * KERNEL ONEWAY ------------------------------------------------------- - */ - public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - try { - this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout()); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - /** - * SELECT SYNC ------------------------------------------------------- - */ - public SendResult send(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); - } - - public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); - } - - private SendResult sendSelectImpl(// - Message msg, // - MessageQueueSelector selector, // - Object arg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, final long timeout// - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); - if (topicPublishInfo != null && topicPublishInfo.ok()) { - MessageQueue mq = null; - try { - mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); - } catch (Throwable e) { - throw new MQClientException("select message queue throwed exception.", e); - } - - if (mq != null) { - return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout); - } else { - throw new MQClientException("select message queue return null.", null); - } - } - - throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); - } - - /** - * SELECT ASYNC ------------------------------------------------------- - */ - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { - send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); - } - - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); - } - } - - /** - * SELECT ONEWAY ------------------------------------------------------- - */ - public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } - } - - public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) - throws MQClientException { - if (null == tranExecuter) { - throw new MQClientException("tranExecutor is null", null); - } - Validators.checkMessage(msg, this.defaultMQProducer); - - SendResult sendResult = null; - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); - try { - sendResult = this.send(msg); - } catch (Exception e) { - throw new MQClientException("send message Exception", e); - } - - LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; - Throwable localException = null; - switch (sendResult.getSendStatus()) { - case SEND_OK: { - try { - if (sendResult.getTransactionId() != null) { - msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); - } - localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); - if (null == localTransactionState) { - localTransactionState = LocalTransactionState.UNKNOW; - } - - if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { - log.info("executeLocalTransactionBranch return {}", localTransactionState); - log.info(msg.toString()); - } - } catch (Throwable e) { - log.info("executeLocalTransactionBranch exception", e); - log.info(msg.toString()); - localException = e; - } - } - break; - case FLUSH_DISK_TIMEOUT: - case FLUSH_SLAVE_TIMEOUT: - case SLAVE_NOT_AVAILABLE: - localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; - break; - default: - break; - } - - try { - this.endTransaction(sendResult, localTransactionState, localException); - } catch (Exception e) { - log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); - } - - TransactionSendResult transactionSendResult = new TransactionSendResult(); - transactionSendResult.setSendStatus(sendResult.getSendStatus()); - transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); - transactionSendResult.setMsgId(sendResult.getMsgId()); - transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); - transactionSendResult.setTransactionId(sendResult.getTransactionId()); - transactionSendResult.setLocalTransactionState(localTransactionState); - return transactionSendResult; - } - - /** - * DEFAULT SYNC ------------------------------------------------------- - */ - public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return send(msg, this.defaultMQProducer.getSendMsgTimeout()); - } - - public void endTransaction(// - final SendResult sendResult, // - final LocalTransactionState localTransactionState, // - final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { - final MessageId id; - if (sendResult.getOffsetMsgId() != null) { - id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); - } else { - id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); - } - String transactionId = sendResult.getTransactionId(); - final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); - EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); - requestHeader.setTransactionId(transactionId); - requestHeader.setCommitLogOffset(id.getOffset()); - switch (localTransactionState) { - case COMMIT_MESSAGE: - requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); - break; - case ROLLBACK_MESSAGE: - requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); - break; - case UNKNOW: - requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); - break; - default: - break; - } - - requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); - requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); - requestHeader.setMsgId(sendResult.getMsgId()); - String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; - this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, - this.defaultMQProducer.getSendMsgTimeout()); - } - - public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); - } - - public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() { - return topicPublishInfoTable; - } - - public int getZipCompressLevel() { - return zipCompressLevel; - } - - - public void setZipCompressLevel(int zipCompressLevel) { - this.zipCompressLevel = zipCompressLevel; - } - - - public ServiceState getServiceState() { - return serviceState; - } - - - public void setServiceState(ServiceState serviceState) { - this.serviceState = serviceState; - } - - public long[] getNotAvailableDuration() { - return this.mqFaultStrategy.getNotAvailableDuration(); - } - - public void setNotAvailableDuration(final long[] notAvailableDuration) { - this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration); - } - - public long[] getLatencyMax() { - return this.mqFaultStrategy.getLatencyMax(); - } - - public void setLatencyMax(final long[] latencyMax) { - this.mqFaultStrategy.setLatencyMax(latencyMax); - } - - public boolean isSendLatencyFaultEnable() { - return this.mqFaultStrategy.isSendLatencyFaultEnable(); - } - - public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { - this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java deleted file mode 100644 index e2837e2..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.producer; - -import com.alibaba.rocketmq.client.producer.TransactionCheckListener; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; - -import java.util.Set; - - -/** - * @author shijia.wxr - */ -public interface MQProducerInner { - Set<String> getPublishTopicList(); - - - boolean isPublishTopicNeedUpdate(final String topic); - - - TransactionCheckListener checkListener(); - - - void checkTransactionState(// - final String addr, // - final MessageExt msg, // - final CheckTransactionStateRequestHeader checkRequestHeader); - - - void updateTopicPublishInfo(final String topic, final TopicPublishInfo info); - - - boolean isUnitMode(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java deleted file mode 100644 index 2f7de22..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.producer; - -import com.alibaba.rocketmq.client.common.ThreadLocalIndex; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.route.QueueData; -import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; - -import java.util.ArrayList; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class TopicPublishInfo { - private boolean orderTopic = false; - private boolean haveTopicRouterInfo = false; - private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); - private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); - private TopicRouteData topicRouteData; - - - public boolean isOrderTopic() { - return orderTopic; - } - - public void setOrderTopic(boolean orderTopic) { - this.orderTopic = orderTopic; - } - - public boolean ok() { - return null != this.messageQueueList && !this.messageQueueList.isEmpty(); - } - - public List<MessageQueue> getMessageQueueList() { - return messageQueueList; - } - - - public void setMessageQueueList(List<MessageQueue> messageQueueList) { - this.messageQueueList = messageQueueList; - } - - - public ThreadLocalIndex getSendWhichQueue() { - return sendWhichQueue; - } - - - public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) { - this.sendWhichQueue = sendWhichQueue; - } - - - public boolean isHaveTopicRouterInfo() { - return haveTopicRouterInfo; - } - - - public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) { - this.haveTopicRouterInfo = haveTopicRouterInfo; - } - - - public MessageQueue selectOneMessageQueue(final String lastBrokerName) { - if (lastBrokerName == null) { - return selectOneMessageQueue(); - } else { - int index = this.sendWhichQueue.getAndIncrement(); - for (int i = 0; i < this.messageQueueList.size(); i++) { - int pos = Math.abs(index++) % this.messageQueueList.size(); - if (pos < 0) - pos = 0; - MessageQueue mq = this.messageQueueList.get(pos); - if (!mq.getBrokerName().equals(lastBrokerName)) { - return mq; - } - } - return selectOneMessageQueue(); - } - } - - - public MessageQueue selectOneMessageQueue() { - int index = this.sendWhichQueue.getAndIncrement(); - int pos = Math.abs(index) % this.messageQueueList.size(); - if (pos < 0) - pos = 0; - return this.messageQueueList.get(pos); - } - - public int getQueueIdByBroker(final String brokerName) { - for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { - final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); - if (queueData.getBrokerName().equals(brokerName)) { - return queueData.getWriteQueueNums(); - } - } - - return -1; - } - - - @Override - public String toString() { - return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList - + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; - } - - public TopicRouteData getTopicRouteData() { - return topicRouteData; - } - - public void setTopicRouteData(final TopicRouteData topicRouteData) { - this.topicRouteData = topicRouteData; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java deleted file mode 100644 index e6152e4..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.latency; - -/** - * @author shijia.wxr - */ -public interface LatencyFaultTolerance<T> { - void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); - - boolean isAvailable(final T name); - - void remove(final T name); - - T pickOneAtLeast(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java deleted file mode 100644 index 8a86449..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.latency; - -import com.alibaba.rocketmq.client.common.ThreadLocalIndex; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author shijia.wxr - */ -public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { - private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); - - private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0); - - @Override - public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { - FaultItem old = this.faultItemTable.get(name); - if (null == old) { - final FaultItem faultItem = new FaultItem(name); - faultItem.setCurrentLatency(currentLatency); - faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); - - old = this.faultItemTable.putIfAbsent(name, faultItem); - if (old != null) { - old.setCurrentLatency(currentLatency); - old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); - } - } else { - old.setCurrentLatency(currentLatency); - old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); - } - } - - @Override - public boolean isAvailable(final String name) { - final FaultItem faultItem = this.faultItemTable.get(name); - if (faultItem != null) { - return faultItem.isAvailable(); - } - return true; - } - - @Override - public void remove(final String name) { - this.faultItemTable.remove(name); - } - - @Override - public String pickOneAtLeast() { - final Enumeration<FaultItem> elements = this.faultItemTable.elements(); - List<FaultItem> tmpList = new LinkedList<FaultItem>(); - while (elements.hasMoreElements()) { - final FaultItem faultItem = elements.nextElement(); - tmpList.add(faultItem); - } - - if (!tmpList.isEmpty()) { - Collections.shuffle(tmpList); - - Collections.sort(tmpList); - - final int half = tmpList.size() / 2; - if (half <= 0) { - return tmpList.get(0).getName(); - } else { - final int i = this.whichItemWorst.getAndIncrement() % half; - return tmpList.get(i).getName(); - } - } - - return null; - } - - class FaultItem implements Comparable<FaultItem> { - private final String name; - private volatile long currentLatency; - private volatile long startTimestamp; - - public FaultItem(final String name) { - this.name = name; - } - - @Override - public int compareTo(final FaultItem other) { - if (this.isAvailable() != other.isAvailable()) { - if (this.isAvailable()) return -1; - - if (other.isAvailable()) return 1; - } - - if (this.currentLatency < other.currentLatency) - return -1; - else if (this.currentLatency > other.currentLatency) { - return 1; - } - - if (this.startTimestamp < other.startTimestamp) - return -1; - else if (this.startTimestamp > other.startTimestamp) { - return 1; - } - - return 0; - } - - public boolean isAvailable() { - return (System.currentTimeMillis() - startTimestamp) >= 0; - } - - @Override - public int hashCode() { - int result = getName() != null ? getName().hashCode() : 0; - result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); - result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); - return result; - } - - @Override - public boolean equals(final Object o) { - if (this == o) return true; - if (!(o instanceof FaultItem)) return false; - - final FaultItem faultItem = (FaultItem) o; - - if (getCurrentLatency() != faultItem.getCurrentLatency()) return false; - if (getStartTimestamp() != faultItem.getStartTimestamp()) return false; - return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; - - } - - @Override - public String toString() { - return "FaultItem{" + - "name='" + name + '\'' + - ", currentLatency=" + currentLatency + - ", startTimestamp=" + startTimestamp + - '}'; - } - - public String getName() { - return name; - } - - public long getCurrentLatency() { - return currentLatency; - } - - public void setCurrentLatency(final long currentLatency) { - this.currentLatency = currentLatency; - } - - public long getStartTimestamp() { - return startTimestamp; - } - - public void setStartTimestamp(final long startTimestamp) { - this.startTimestamp = startTimestamp; - } - - - } - - @Override - public String toString() { - return "LatencyFaultToleranceImpl{" + - "faultItemTable=" + faultItemTable + - ", whichItemWorst=" + whichItemWorst + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java deleted file mode 100644 index b323f04..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.latency; - -import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo; -import com.alibaba.rocketmq.common.message.MessageQueue; - -/** - * @author shijia.wxr - */ -public class MQFaultStrategy { - private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); - - private boolean sendLatencyFaultEnable = false; - - private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; - private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; - - public long[] getNotAvailableDuration() { - return notAvailableDuration; - } - - public void setNotAvailableDuration(final long[] notAvailableDuration) { - this.notAvailableDuration = notAvailableDuration; - } - - public long[] getLatencyMax() { - return latencyMax; - } - - public void setLatencyMax(final long[] latencyMax) { - this.latencyMax = latencyMax; - } - - public boolean isSendLatencyFaultEnable() { - return sendLatencyFaultEnable; - } - - public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { - this.sendLatencyFaultEnable = sendLatencyFaultEnable; - } - - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { - if (this.sendLatencyFaultEnable) { - try { - int index = tpInfo.getSendWhichQueue().getAndIncrement(); - for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { - int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); - if (pos < 0) - pos = 0; - MessageQueue mq = tpInfo.getMessageQueueList().get(pos); - if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { - if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) - return mq; - } - } - - final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); - int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); - if (writeQueueNums > 0) { - final MessageQueue mq = tpInfo.selectOneMessageQueue(); - if (notBestBroker != null) { - mq.setBrokerName(notBestBroker); - mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); - } - return mq; - } else { - latencyFaultTolerance.remove(notBestBroker); - } - } catch (Exception e) { - } - - return tpInfo.selectOneMessageQueue(); - } - - return tpInfo.selectOneMessageQueue(lastBrokerName); - } - - public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { - if (this.sendLatencyFaultEnable) { - long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); - this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); - } - } - - private long computeNotAvailableDuration(final long currentLatency) { - for (int i = latencyMax.length - 1; i >= 0; i--) { - if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; - } - - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java b/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java deleted file mode 100644 index 02af207..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.log; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import org.slf4j.ILoggerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Method; -import java.net.URL; - - -/** - * @author shijia.wxr - */ -public class ClientLogger { - private static Logger log; - public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot"; - public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"; - public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel"; - - static { - log = createLogger(LoggerName.CLIENT_LOGGER_NAME); - } - - - private static Logger createLogger(final String loggerName) { - String logConfigFilePath = - System.getProperty("rocketmq.client.log.configFile", - System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE")); - Boolean isloadconfig = - Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true")); - - final String log4JResourceFile = - System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml"); - - final String logbackResourceFile = - System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml"); - - String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs"); - System.setProperty("client.logRoot", clientLogRoot); - String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO"); - System.setProperty("client.logLevel", clientLogLevel); - String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10"); - System.setProperty("client.logFileMaxIndex", clientLogMaxIndex); - - if (isloadconfig) { - try { - ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory(); - Class classType = iLoggerFactory.getClass(); - if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) { - Class<?> domconfigurator; - Object domconfiguratorobj; - domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator"); - domconfiguratorobj = domconfigurator.newInstance(); - if (null == logConfigFilePath) { - Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class); - URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile); - configure.invoke(domconfiguratorobj, url); - } else { - Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class); - configure.invoke(domconfiguratorobj, logConfigFilePath); - } - - } else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) { - Class<?> joranConfigurator; - Class<?> context = Class.forName("ch.qos.logback.core.Context"); - Object joranConfiguratoroObj; - joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator"); - joranConfiguratoroObj = joranConfigurator.newInstance(); - Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context); - setContext.invoke(joranConfiguratoroObj, iLoggerFactory); - if (null == logConfigFilePath) { - URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile); - Method doConfigure = - joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class); - doConfigure.invoke(joranConfiguratoroObj, url); - } else { - Method doConfigure = - joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class); - doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath); - } - - } - } catch (Exception e) { - System.err.println(e); - } - } - return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); - } - - - public static Logger getLog() { - return log; - } - - - public static void setLog(Logger log) { - ClientLogger.log = log; - } -}
