http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 6e0e379..42b7a82 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -16,11 +16,40 @@ */ package org.apache.rocketmq.client.impl.factory; +import java.io.UnsupportedEncodingException; +import java.net.DatagramSocket; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.impl.*; +import org.apache.rocketmq.client.impl.ClientRemotingProcessor; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.impl.consumer.PullMessageService; +import org.apache.rocketmq.client.impl.consumer.RebalanceService; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; @@ -36,7 +65,11 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.heartbeat.*; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -45,19 +78,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.client.impl.consumer.*; import org.slf4j.Logger; -import java.io.UnsupportedEncodingException; -import java.net.DatagramSocket; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - public class MQClientInstance { private final static long LOCK_TIMEOUT_MILLIS = 3000; private final Logger log = ClientLogger.getLog(); @@ -75,7 +97,7 @@ public class MQClientInstance { private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock(); private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = - new ConcurrentHashMap<String, HashMap<Long, String>>(); + new ConcurrentHashMap<String, HashMap<Long, String>>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -92,12 +114,10 @@ public class MQClientInstance { private DatagramSocket datagramSocket; private Random random = new Random(); - public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) { this(clientConfig, instanceIndex, clientId, null); } - public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { this.clientConfig = clientConfig; this.instanceIndex = instanceIndex; @@ -125,10 +145,74 @@ public class MQClientInstance { this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", // - this.instanceIndex, // - this.clientId, // - this.clientConfig, // - MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); + this.instanceIndex, // + this.clientId, // + this.clientConfig, // + MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); + } + + public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { + TopicPublishInfo info = new TopicPublishInfo(); + info.setTopicRouteData(route); + if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { + String[] brokers = route.getOrderTopicConf().split(";"); + for (String broker : brokers) { + String[] item = broker.split(":"); + int nums = Integer.parseInt(item[1]); + for (int i = 0; i < nums; i++) { + MessageQueue mq = new MessageQueue(topic, item[0], i); + info.getMessageQueueList().add(mq); + } + } + + info.setOrderTopic(true); + } else { + List<QueueData> qds = route.getQueueDatas(); + Collections.sort(qds); + for (QueueData qd : qds) { + if (PermName.isWriteable(qd.getPerm())) { + BrokerData brokerData = null; + for (BrokerData bd : route.getBrokerDatas()) { + if (bd.getBrokerName().equals(qd.getBrokerName())) { + brokerData = bd; + break; + } + } + + if (null == brokerData) { + continue; + } + + if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { + continue; + } + + for (int i = 0; i < qd.getWriteQueueNums(); i++) { + MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); + info.getMessageQueueList().add(mq); + } + } + } + + info.setOrderTopic(false); + } + + return info; + } + + public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { + Set<MessageQueue> mqList = new HashSet<MessageQueue>(); + List<QueueData> qds = route.getQueueDatas(); + for (QueueData qd : qds) { + if (PermName.isReadable(qd.getPerm())) { + for (int i = 0; i < qd.getReadQueueNums(); i++) { + MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); + mqList.add(mq); + } + } + } + + return mqList; } public void start() throws MQClientException { @@ -166,7 +250,6 @@ public class MQClientInstance { } } - private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -353,7 +436,7 @@ public class MQClientInstance { if (impl != null) { try { if (impl instanceof DefaultMQPushConsumerImpl) { - DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl; + DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl; dmq.adjustThreadPool(); } } catch (Exception e) { @@ -420,7 +503,7 @@ public class MQClientInstance { log.error("send heart beat to broker exception", e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, - id, addr); + id, addr); } } } @@ -460,7 +543,7 @@ public class MQClientInstance { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), - 1000 * 3); + 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); @@ -559,7 +642,6 @@ public class MQClientInstance { } } - // Producer for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); @@ -590,7 +672,7 @@ public class MQClientInstance { } private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, final String topic, - final String filterClassSource) throws UnsupportedEncodingException { + final String filterClassSource) throws UnsupportedEncodingException { byte[] classBody = null; int classCRC = 0; try { @@ -598,13 +680,13 @@ public class MQClientInstance { classCRC = UtilAll.crc32(classBody); } catch (Exception e1) { log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", // - fullClassName, // - RemotingHelper.exceptionSimpleDesc(e1)); + fullClassName, // + RemotingHelper.exceptionSimpleDesc(e1)); } TopicRouteData topicRouteData = this.topicRouteTable.get(topic); if (topicRouteData != null // - && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { + && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator(); while (it.hasNext()) { Entry<String, List<String>> next = it.next(); @@ -612,10 +694,10 @@ public class MQClientInstance { for (final String fsAddr : value) { try { this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody, - 5000); + 5000); log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup, - topic, fullClassName); + topic, fullClassName); } catch (Exception e) { log.error("uploadFilterClassToAllFilterServer Exception", e); @@ -624,7 +706,7 @@ public class MQClientInstance { } } else { log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}", - consumerGroup, topic, fullClassName); + consumerGroup, topic, fullClassName); } } @@ -668,70 +750,6 @@ public class MQClientInstance { return result; } - public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { - TopicPublishInfo info = new TopicPublishInfo(); - info.setTopicRouteData(route); - if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { - String[] brokers = route.getOrderTopicConf().split(";"); - for (String broker : brokers) { - String[] item = broker.split(":"); - int nums = Integer.parseInt(item[1]); - for (int i = 0; i < nums; i++) { - MessageQueue mq = new MessageQueue(topic, item[0], i); - info.getMessageQueueList().add(mq); - } - } - - info.setOrderTopic(true); - } else { - List<QueueData> qds = route.getQueueDatas(); - Collections.sort(qds); - for (QueueData qd : qds) { - if (PermName.isWriteable(qd.getPerm())) { - BrokerData brokerData = null; - for (BrokerData bd : route.getBrokerDatas()) { - if (bd.getBrokerName().equals(qd.getBrokerName())) { - brokerData = bd; - break; - } - } - - if (null == brokerData) { - continue; - } - - if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { - continue; - } - - for (int i = 0; i < qd.getWriteQueueNums(); i++) { - MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); - info.getMessageQueueList().add(mq); - } - } - } - - info.setOrderTopic(false); - } - - return info; - } - - public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { - Set<MessageQueue> mqList = new HashSet<MessageQueue>(); - List<QueueData> qds = route.getQueueDatas(); - for (QueueData qd : qds) { - if (PermName.isReadable(qd.getPerm())) { - for (int i = 0; i < qd.getReadQueueNums(); i++) { - MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); - mqList.add(mq); - } - } - } - - return mqList; - } - public void shutdown() { // Consumer if (!this.consumerTable.isEmpty()) @@ -824,7 +842,7 @@ public class MQClientInstance { try { this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, - consumerGroup, brokerName, entry1.getKey(), addr); + consumerGroup, brokerName, entry1.getKey(), addr); } catch (RemotingException e) { log.error("unregister client exception from broker: " + addr, e); } catch (MQBrokerException e) { @@ -942,9 +960,9 @@ public class MQClientInstance { } public FindBrokerResult findBrokerAddressInSubscribe(// - final String brokerName, // - final long brokerId, // - final boolean onlyThisBroker// + final String brokerName, // + final long brokerId, // + final boolean onlyThisBroker// ) { String brokerAddr = null; boolean slave = false; @@ -1008,7 +1026,7 @@ public class MQClientInstance { try { MQConsumerInner impl = this.consumerTable.get(group); if (impl != null && impl instanceof DefaultMQPushConsumerImpl) { - consumer = (DefaultMQPushConsumerImpl) impl; + consumer = (DefaultMQPushConsumerImpl)impl; } else { log.info("[reset-offset] consumer dose not exist. group={}", group); return; @@ -1053,10 +1071,10 @@ public class MQClientInstance { public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) { MQConsumerInner impl = this.consumerTable.get(group); if (impl != null && impl instanceof DefaultMQPushConsumerImpl) { - DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl; + DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl; return consumer.getOffsetStore().cloneOffsetTable(topic); } else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) { - DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl; + DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl; return consumer.getOffsetStore().cloneOffsetTable(topic); } else { return Collections.EMPTY_MAP; @@ -1096,11 +1114,11 @@ public class MQClientInstance { } public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, // - final String consumerGroup, // - final String brokerName) { + final String consumerGroup, // + final String brokerName) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); if (null != mqConsumerInner) { - DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; + DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner; ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName); return result; @@ -1109,7 +1127,6 @@ public class MQClientInstance { return null; } - public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); @@ -1128,12 +1145,11 @@ public class MQClientInstance { consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name()); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION, - MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); return consumerRunningInfo; } - public ConsumerStatsManager getConsumerStatsManager() { return consumerStatsManager; }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index b53fa19..42bf360 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -16,6 +16,20 @@ */ package org.apache.rocketmq.client.impl.producer; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -30,11 +44,29 @@ import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageId; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; @@ -45,21 +77,14 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.client.producer.*; import org.slf4j.Logger; -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; - - public class DefaultMQProducerImpl implements MQProducerInner { private final Logger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = - new ConcurrentHashMap<String, TopicPublishInfo>(); + new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; protected BlockingQueue<Runnable> checkRequestQueue; @@ -71,12 +96,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); } - public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; @@ -85,18 +108,18 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { this.checkForbiddenHookList.add(checkForbiddenHook); log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), - checkForbiddenHookList.size()); + checkForbiddenHookList.size()); } public void initTransactionEnv() { - TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; + TransactionMQProducer producer = (TransactionMQProducer)this.defaultMQProducer; this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); this.checkExecutor = new ThreadPoolExecutor(// - producer.getCheckThreadPoolMinSize(), // - producer.getCheckThreadPoolMaxSize(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.checkRequestQueue); + producer.getCheckThreadPoolMinSize(), // + producer.getCheckThreadPoolMaxSize(), // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.checkRequestQueue); } public void destroyTransactionEnv() { @@ -130,8 +153,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); @@ -141,16 +164,16 @@ public class DefaultMQProducerImpl implements MQProducerInner { } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), - this.defaultMQProducer.isSendMessageWithVIPChannel()); + this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); default: break; } @@ -167,7 +190,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", - null); + null); } } @@ -215,7 +238,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public TransactionCheckListener checkListener() { if (this.defaultMQProducer instanceof TransactionMQProducer) { - TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; + TransactionMQProducer producer = (TransactionMQProducer)defaultMQProducer; return producer.getTransactionCheckListener(); } @@ -230,7 +253,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final CheckTransactionStateRequestHeader checkRequestHeader = header; private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); - @Override public void run() { TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); @@ -245,19 +267,18 @@ public class DefaultMQProducerImpl implements MQProducerInner { } this.processTransactionState(// - localTransactionState, // - group, // - exception); + localTransactionState, // + group, // + exception); } else { log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); } } - private void processTransactionState(// - final LocalTransactionState localTransactionState, // - final String producerGroup, // - final Throwable exception) { + final LocalTransactionState localTransactionState, // + final String producerGroup, // + final Throwable exception) { final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); thisHeader.setProducerGroup(producerGroup); @@ -293,7 +314,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { try { DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, - 3000); + 3000); } catch (Exception e) { log.error("endTransactionOneway exception", e); } @@ -332,9 +353,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); } } @@ -370,13 +391,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { this.makeSureStateOK(); return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } public MessageExt queryMessageByUniqKey(String topic, String uniqKey) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { this.makeSureStateOK(); return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); } @@ -389,7 +410,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { try { this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); } catch (MQBrokerException e) { @@ -406,10 +427,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private SendResult sendDefaultImpl(// - Message msg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final long timeout// + Message msg, // + final CommunicationMode communicationMode, // + final SendCallback sendCallback, // + final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -508,16 +529,16 @@ public class DefaultMQProducerImpl implements MQProducerInner { } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", - times, - System.currentTimeMillis() - beginTimestampFirst, - msg.getTopic(), - Arrays.toString(brokersSent)); + times, + System.currentTimeMillis() - beginTimestampFirst, + msg.getTopic(), + Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (exception instanceof MQBrokerException) { - mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); + mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { @@ -532,11 +553,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( - "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); + "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), - null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); + null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { @@ -557,11 +578,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private SendResult sendKernelImpl(final Message msg, // - final MessageQueue mq, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, // + final CommunicationMode communicationMode, // + final SendCallback sendCallback, // + final TopicPublishInfo topicPublishInfo, // + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); @@ -649,30 +670,30 @@ public class DefaultMQProducerImpl implements MQProducerInner { switch (communicationMode) { case ASYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// - brokerAddr, // 1 - mq.getBrokerName(), // 2 - msg, // 3 - requestHeader, // 4 - timeout, // 5 - communicationMode, // 6 - sendCallback, // 7 - topicPublishInfo, // 8 - this.mQClientFactory, // 9 - this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 - context, // - this); + brokerAddr, // 1 + mq.getBrokerName(), // 2 + msg, // 3 + requestHeader, // 4 + timeout, // 5 + communicationMode, // 6 + sendCallback, // 7 + topicPublishInfo, // 8 + this.mQClientFactory, // 9 + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 + context, // + this); break; case ONEWAY: case SYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( - brokerAddr, - mq.getBrokerName(), - msg, - requestHeader, - timeout, - communicationMode, - context, - this); + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout, + communicationMode, + context, + this); break; default: assert false; @@ -790,12 +811,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { * KERNEL SYNC ------------------------------------------------------- */ public SendResult send(Message msg, MessageQueue mq) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout()); } public SendResult send(Message msg, MessageQueue mq, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -810,12 +831,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { * KERNEL ASYNC ------------------------------------------------------- */ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -848,21 +869,21 @@ public class DefaultMQProducerImpl implements MQProducerInner { * SELECT SYNC ------------------------------------------------------- */ public SendResult send(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); } public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); } private SendResult sendSelectImpl(// - Message msg, // - MessageQueueSelector selector, // - Object arg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, final long timeout// + Message msg, // + MessageQueueSelector selector, // + Object arg, // + final CommunicationMode communicationMode, // + final SendCallback sendCallback, final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -890,12 +911,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { * SELECT ASYNC ------------------------------------------------------- */ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { try { this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); } catch (MQBrokerException e) { @@ -907,7 +928,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { * SELECT ONEWAY ------------------------------------------------------- */ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { try { this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout()); } catch (MQBrokerException e) { @@ -916,7 +937,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) - throws MQClientException { + throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); } @@ -988,9 +1009,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void endTransaction(// - final SendResult sendResult, // - final LocalTransactionState localTransactionState, // - final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { + final SendResult sendResult, // + final LocalTransactionState localTransactionState, // + final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); @@ -1021,7 +1042,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, - this.defaultMQProducer.getSendMsgTimeout()); + this.defaultMQProducer.getSendMsgTimeout()); } public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { @@ -1036,17 +1057,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { return zipCompressLevel; } - public void setZipCompressLevel(int zipCompressLevel) { this.zipCompressLevel = zipCompressLevel; } - public ServiceState getServiceState() { return serviceState; } - public void setServiceState(ServiceState serviceState) { this.serviceState = serviceState; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java index cac77ae..cf61326 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -6,41 +6,34 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.producer; +import java.util.Set; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; -import java.util.Set; - - public interface MQProducerInner { Set<String> getPublishTopicList(); - boolean isPublishTopicNeedUpdate(final String topic); - TransactionCheckListener checkListener(); - void checkTransactionState(// - final String addr, // - final MessageExt msg, // - final CheckTransactionStateRequestHeader checkRequestHeader); - + final String addr, // + final MessageExt msg, // + final CheckTransactionStateRequestHeader checkRequestHeader); void updateTopicPublishInfo(final String topic, final TopicPublishInfo info); - boolean isUnitMode(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index dca20cb..c6f9d45 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -16,15 +16,13 @@ */ package org.apache.rocketmq.client.impl.producer; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import java.util.ArrayList; -import java.util.List; - - public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; @@ -32,7 +30,6 @@ public class TopicPublishInfo { private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); private TopicRouteData topicRouteData; - public boolean isOrderTopic() { return orderTopic; } @@ -49,32 +46,26 @@ public class TopicPublishInfo { return messageQueueList; } - public void setMessageQueueList(List<MessageQueue> messageQueueList) { this.messageQueueList = messageQueueList; } - public ThreadLocalIndex getSendWhichQueue() { return sendWhichQueue; } - public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) { this.sendWhichQueue = sendWhichQueue; } - public boolean isHaveTopicRouterInfo() { return haveTopicRouterInfo; } - public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) { this.haveTopicRouterInfo = haveTopicRouterInfo; } - public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); @@ -93,7 +84,6 @@ public class TopicPublishInfo { } } - public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); @@ -113,11 +103,10 @@ public class TopicPublishInfo { return -1; } - @Override public String toString() { return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList - + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; + + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; } public TopicRouteData getTopicRouteData() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index 12dac4b..b61d855 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -17,13 +17,12 @@ package org.apache.rocketmq.client.latency; -import org.apache.rocketmq.client.common.ThreadLocalIndex; - import java.util.Collections; import java.util.Enumeration; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.client.common.ThreadLocalIndex; public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); @@ -89,6 +88,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> return null; } + @Override + public String toString() { + return "LatencyFaultToleranceImpl{" + + "faultItemTable=" + faultItemTable + + ", whichItemWorst=" + whichItemWorst + + '}'; + } + class FaultItem implements Comparable<FaultItem> { private final String name; private volatile long currentLatency; @@ -101,9 +108,11 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { - if (this.isAvailable()) return -1; + if (this.isAvailable()) + return -1; - if (other.isAvailable()) return 1; + if (other.isAvailable()) + return 1; } if (this.currentLatency < other.currentLatency) @@ -128,20 +137,24 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> @Override public int hashCode() { int result = getName() != null ? getName().hashCode() : 0; - result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); - result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); + result = 31 * result + (int)(getCurrentLatency() ^ (getCurrentLatency() >>> 32)); + result = 31 * result + (int)(getStartTimestamp() ^ (getStartTimestamp() >>> 32)); return result; } @Override public boolean equals(final Object o) { - if (this == o) return true; - if (!(o instanceof FaultItem)) return false; + if (this == o) + return true; + if (!(o instanceof FaultItem)) + return false; - final FaultItem faultItem = (FaultItem) o; + final FaultItem faultItem = (FaultItem)o; - if (getCurrentLatency() != faultItem.getCurrentLatency()) return false; - if (getStartTimestamp() != faultItem.getStartTimestamp()) return false; + if (getCurrentLatency() != faultItem.getCurrentLatency()) + return false; + if (getStartTimestamp() != faultItem.getStartTimestamp()) + return false; return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; } @@ -149,10 +162,10 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> @Override public String toString() { return "FaultItem{" + - "name='" + name + '\'' + - ", currentLatency=" + currentLatency + - ", startTimestamp=" + startTimestamp + - '}'; + "name='" + name + '\'' + + ", currentLatency=" + currentLatency + + ", startTimestamp=" + startTimestamp + + '}'; } public String getName() { @@ -175,14 +188,5 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> this.startTimestamp = startTimestamp; } - - } - - @Override - public String toString() { - return "LatencyFaultToleranceImpl{" + - "faultItemTable=" + faultItemTable + - ", whichItemWorst=" + whichItemWorst + - '}'; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index cdfd5d1..70758dc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -25,8 +25,8 @@ public class MQFaultStrategy { private boolean sendLatencyFaultEnable = false; - private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; - private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; + private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; + private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; @@ -97,7 +97,8 @@ public class MQFaultStrategy { private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { - if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; + if (currentLatency >= latencyMax[i]) + return this.notAvailableDuration[i]; } return 0; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java index 3055119..7a05e76 100644 --- a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java +++ b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java @@ -16,38 +16,35 @@ */ package org.apache.rocketmq.client.log; +import java.lang.reflect.Method; +import java.net.URL; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.ILoggerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Method; -import java.net.URL; - - public class ClientLogger { - private static Logger log; public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot"; public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"; public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel"; + private static Logger log; static { log = createLogger(LoggerName.CLIENT_LOGGER_NAME); } - private static Logger createLogger(final String loggerName) { String logConfigFilePath = - System.getProperty("rocketmq.client.log.configFile", - System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE")); + System.getProperty("rocketmq.client.log.configFile", + System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE")); Boolean isloadconfig = - Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true")); + Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true")); final String log4JResourceFile = - System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml"); + System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml"); final String logbackResourceFile = - System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml"); + System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml"); String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs"); System.setProperty("client.logRoot", clientLogRoot); @@ -85,11 +82,11 @@ public class ClientLogger { if (null == logConfigFilePath) { URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile); Method doConfigure = - joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class); + joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class); doConfigure.invoke(joranConfiguratoroObj, url); } else { Method doConfigure = - joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class); + joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class); doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath); } @@ -101,12 +98,10 @@ public class ClientLogger { return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); } - public static Logger getLog() { return log; } - public static void setLog(Logger log) { ClientLogger.log = log; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 340b1ff..736aa15 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -6,28 +6,30 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; +import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageId; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.common.message.*; - -import java.util.List; - public class DefaultMQProducer extends ClientConfig implements MQProducer { protected final transient DefaultMQProducerImpl defaultMQProducerImpl; @@ -44,27 +46,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { private boolean retryAnotherBrokerWhenNotStoreOK = false; private int maxMessageSize = 1024 * 1024 * 4; // 4M + public DefaultMQProducer() { this(MixAll.DEFAULT_PRODUCER_GROUP, null); } - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); } - public DefaultMQProducer(final String producerGroup) { this(producerGroup, null); } - public DefaultMQProducer(RPCHook rpcHook) { this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); } - @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); @@ -75,169 +74,143 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.shutdown(); } - @Override public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); } - @Override public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg); } - @Override public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, timeout); } - @Override public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } - @Override public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } - @Override public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg); } - @Override public SendResult send(Message msg, MessageQueue mq) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq); } - @Override public SendResult send(Message msg, MessageQueue mq, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq, timeout); } - @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback); } - @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); } - @Override public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg, mq); } - @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); } - @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); } - @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); } - @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } - @Override public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg, selector, arg); } - @Override public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) - throws MQClientException { + throws MQClientException { throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } - @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); } - @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); } - @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.defaultMQProducerImpl.searchOffset(mq, timestamp); } - @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.maxOffset(mq); } - @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.minOffset(mq); } - @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); } - @Override public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQProducerImpl.viewMessage(offsetMsgId); } - @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end); } - @Override public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { @@ -252,97 +225,78 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return producerGroup; } - public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } - public String getCreateTopicKey() { return createTopicKey; } - public void setCreateTopicKey(String createTopicKey) { this.createTopicKey = createTopicKey; } - public int getSendMsgTimeout() { return sendMsgTimeout; } - public void setSendMsgTimeout(int sendMsgTimeout) { this.sendMsgTimeout = sendMsgTimeout; } - public int getCompressMsgBodyOverHowmuch() { return compressMsgBodyOverHowmuch; } - public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; } - public DefaultMQProducerImpl getDefaultMQProducerImpl() { return defaultMQProducerImpl; } - public boolean isRetryAnotherBrokerWhenNotStoreOK() { return retryAnotherBrokerWhenNotStoreOK; } - public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) { this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK; } - public int getMaxMessageSize() { return maxMessageSize; } - public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } - public int getDefaultTopicQueueNums() { return defaultTopicQueueNums; } - public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { this.defaultTopicQueueNums = defaultTopicQueueNums; } - public int getRetryTimesWhenSendFailed() { return retryTimesWhenSendFailed; } - public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; } - public boolean isSendMessageWithVIPChannel() { return isVipChannelEnabled(); } - public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) { this.setVipChannelEnabled(sendMessageWithVIPChannel); } - public long[] getNotAvailableDuration() { return this.defaultMQProducerImpl.getNotAvailableDuration(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java index a7246e0..1083f9b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; import org.apache.rocketmq.common.message.Message; - public interface LocalTransactionExecuter { public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java index b907f81..209619a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 492604e..b53652a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -6,16 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; +import java.util.List; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -23,81 +24,61 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.util.List; - - public interface MQProducer extends MQAdmin { void start() throws MQClientException; void shutdown(); - List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; - SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException; - + InterruptedException; SendResult send(final Message msg, final long timeout) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException; - + RemotingException, MQBrokerException, InterruptedException; void send(final Message msg, final SendCallback sendCallback) throws MQClientException, - RemotingException, InterruptedException; - + RemotingException, InterruptedException; void send(final Message msg, final SendCallback sendCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException; - + throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg) throws MQClientException, RemotingException, - InterruptedException; - + InterruptedException; SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException; - + RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final MessageQueue mq, final long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException; - + throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException; - + throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, - RemotingException, InterruptedException; - + RemotingException, InterruptedException; SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException; - + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback) throws MQClientException, RemotingException, - InterruptedException; - + final SendCallback sendCallback) throws MQClientException, RemotingException, + InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, - InterruptedException; - + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + InterruptedException; void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) - throws MQClientException, RemotingException, InterruptedException; - + throws MQClientException, RemotingException, InterruptedException; TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java index 47956bb..761f45e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; +import java.util.List; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.List; - - public interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java index f599d83..178e79a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; public interface SendCallback { public void onSuccess(final SendResult sendResult); - public void onException(final Throwable e); }