Repository: incubator-rocketmq Updated Branches: refs/heads/master 6baa2ed59 -> 7fcf2f1de
Add javadoc to DefaultMQProducer Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/7fcf2f1d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/7fcf2f1d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/7fcf2f1d Branch: refs/heads/master Commit: 7fcf2f1dec0943ea540551b73eb9bd13e3bec59d Parents: 6baa2ed Author: Zhanhui Li <[email protected]> Authored: Wed Jan 11 12:12:15 2017 +0800 Committer: Zhanhui Li <[email protected]> Committed: Wed Jan 11 12:12:15 2017 +0800 ---------------------------------------------------------------------- .../client/producer/DefaultMQProducer.java | 198 +++++++++++++++++++ 1 file changed, 198 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7fcf2f1d/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index c677324..a71a743 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -31,92 +31,290 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; +/** + * This class is the entry point for applications intending to send messages. + * </p> + * + * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of + * box for most scenarios. + * </p> + * + * This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and + * cons; you'd better understand strengths and weakness of them before actually coding. + * </p> + * + * <p> + * <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe + * and used among multiple threads context. + * </p> + */ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Wrapping internal implementations for virtually all methods presented in this class. + */ protected final transient DefaultMQProducerImpl defaultMQProducerImpl; + + /** + * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly + * important when transactional messages are involved. + * </p> + * + * For non-transactional messages, it does not matter as long as it's unique per process. + * </p> + * + * See {@linktourl http://rocketmq.incubator.apache.org/docs/core-concept/} for more discussion. + */ private String producerGroup; + /** * Just for testing or demo program */ private String createTopicKey = MixAll.DEFAULT_TOPIC; + + /** + * Number of queues to create per default topic. + */ private volatile int defaultTopicQueueNums = 4; + + /** + * Timeout for sending messages. + */ private int sendMsgTimeout = 3000; + + /** + * Compress message body threshold, namely, message body larger than 4k will be compressed on default. + */ private int compressMsgBodyOverHowmuch = 1024 * 4; + + /** + * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. + * </p> + * + * This may potentially cause message duplication which is up to application developers to resolve. + */ private int retryTimesWhenSendFailed = 2; + + /** + * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. + * </p> + * + * This may potentially cause message duplication which is up to application developers to resolve. + */ private int retryTimesWhenSendAsyncFailed = 2; + /** + * Indicate whether to retry another broker on sending failure internally. + */ private boolean retryAnotherBrokerWhenNotStoreOK = false; + + /** + * Maximum allowed message size in bytes. + */ private int maxMessageSize = 1024 * 1024 * 4; // 4M + /** + * Default constructor. + */ public DefaultMQProducer() { this(MixAll.DEFAULT_PRODUCER_GROUP, null); } + /** + * Constructor specifying both producer group and RPC hook. + * + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); } + /** + * Constructor specifying producer group. + * @param producerGroup Producer group, see the name-sake field. + */ public DefaultMQProducer(final String producerGroup) { this(producerGroup, null); } + /** + * Constructor specifying the RPC hook. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ public DefaultMQProducer(RPCHook rpcHook) { this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); } + /** + * Start this producer instance. + * </p> + * + * <strong> + * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke + * this method before sending or querying messages. + * </strong> + * </p> + * + * @throws MQClientException if there is any unexpected error. + */ @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); } + /** + * This method shuts down this producer instance and releases related resources. + */ @Override public void shutdown() { this.defaultMQProducerImpl.shutdown(); } + /** + * Fetch message queues of topic <code>topic</code>, to which we may send/publish messages. + * @param topic Topic to fetch. + * @return List of message queues readily to send messages to + * @throws MQClientException if there is any client error. + */ @Override public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); } + /** + * Send message in synchronous mode. This method returns only when the sending procedure totally completes. + * </p> + * + * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry + * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially + * delivered to broker(s). It's up to the application developers to resolve potential duplication issue. + * + * @param msg Message to send. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg); } + /** + * Same to {@link #send(Message)} with send timeout specified in addition. + * @param msg Message to send. + * @param timeout send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, timeout); } + /** + * Send message to broker asynchronously. + * </p> + * + * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. + * </p> + * + * Similar to {@link #send(Message)}, internal implementation would potentially retry up to + * {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication + * and application developers are the one to resolve this potential issue. + * @param msg Message to send. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } + /** + * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition. + * @param msg message to send. + * @param sendCallback Callback to execute. + * @param timeout send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } + /** + * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for + * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. + * @param msg Message to send. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg); } + /** + * Same to {@link #send(Message)} with target message queue specified in addition. + * @param msg Message to send. + * @param mq Target message queue. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq); } + /** + * Same to {@link #send(Message)} with target message queue and send timeout specified. + * + * @param msg Message to send. + * @param mq Target message queue. + * @param timeout send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, MessageQueue mq, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq, timeout); } + /** + * Same to {@link #send(Message, SendCallback)} with target message queue specified. + * + * @param msg Message to send. + * @param mq Target message queue. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
