Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master de628a444 -> f32e0b9dc


Add javadoc to DefaultMQPushConsumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f32e0b9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f32e0b9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f32e0b9d

Branch: refs/heads/master
Commit: f32e0b9dc37b52e89ce75cf419e2933a8cccde06
Parents: de628a4
Author: Zhanhui Li <lizhan...@apache.org>
Authored: Fri Jan 13 11:20:29 2017 +0800
Committer: Zhanhui Li <lizhan...@apache.org>
Committed: Fri Jan 13 11:20:29 2017 +0800

----------------------------------------------------------------------
 .../client/consumer/DefaultMQPushConsumer.java  | 187 ++++++++++++++++++-
 1 file changed, 178 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f32e0b9d/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 45f23a7..2cce03d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -40,51 +40,116 @@ import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
 /**
- * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
+ * In most scenarios, this is the mostly recommended class to consume messages.
+ * </p>
+ *
+ * Technically speaking, this push client is virtually a wrapper of the 
underlying pull service. Specifically, on
+ * arrival of messages pulled from brokers, it roughly invokes the registered 
callback handler to feed the messages.
+ * </p>
+ *
+ * See quickstart/Consumer in the example module for a typical usage.
+ * </p>
+ *
+ * <p>
+ *     <strong>Thread Safety:</strong> After initialization, the instance can 
be regarded as thread-safe.
+ * </p>
  */
 public class DefaultMQPushConsumer extends ClientConfig implements 
MQPushConsumer {
+
+    /**
+     * Internal implementation. Most of the functions herein are delegated to 
it.
+     */
     protected final transient DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl;
+
     /**
-     * Do the same thing for the same Group, the application must be set,and
-     * guarantee Globally unique
+     * Consumers of the same role is required to have exactly same 
subscriptions and consumerGroup to correctly achieve
+     * load balance. It's required and needs to be globally unique.
+     * </p>
+     *
+     * See <a 
href="http://rocketmq.incubator.apache.org/docs/core-concept/";>here</a> for 
further discussion.
      */
     private String consumerGroup;
+
     /**
-     * Consumption pattern,default is clustering
+     * Message model defines the way how messages are delivered to each 
consumer clients.
+     * </p>
+     *
+     * RocketMQ supports two message models: clustering and broadcasting. If 
clustering is set, consumer clients with
+     * the same {@link #consumerGroup} would only consume shards of the 
messages subscribed, which achieves load
+     * balances; Conversely, if the broadcasting is set, each consumer client 
will consume all subscribed messages
+     * separately.
+     * </p>
+     *
+     * This field defaults to clustering.
      */
     private MessageModel messageModel = MessageModel.CLUSTERING;
+
     /**
-     * Consumption offset
+     * Consuming point on consumer booting.
+     * </p>
+     *
+     * There are three consuming points:
+     * <ul>
+     *     <li>
+     *         <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up 
where it stopped previously.
+     *         If it were a newly booting up consumer client, according aging 
of the consumer group, there are two
+     *         cases:
+     *         <ol>
+     *             <li>
+     *                 if the consumer group is created so recently that the 
earliest message being subscribed has yet
+     *                 expired, which means the consumer group represents a 
lately launched business, consuming will
+     *                 start from the very beginning;
+     *             </li>
+     *             <li>
+     *                 if the earliest message being subscribed has expired, 
consuming will start from the latest
+     *                 messages, meaning messages born prior to the booting 
timestamp would be ignored.
+     *             </li>
+     *         </ol>
+     *     </li>
+     *     <li>
+     *         <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will 
start from earliest messages available.
+     *     </li>
+     *     <li>
+     *         <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start 
from specified timestamp, which means
+     *         messages born prior to {@link #consumeTimestamp} will be ignored
+     *     </li>
+     * </ul>
      */
     private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+
     /**
-     * Backtracking consumption time with second precision.time format is
+     * Backtracking consumption time with second precision. Time format is
      * 20131223171201<br>
      * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
-     * Default backtracking consumption time Half an hour ago
+     * Default backtracking consumption time Half an hour ago.
      */
     private String consumeTimestamp = 
UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+
     /**
-     * Queue allocation algorithm
+     * Queue allocation algorithm specifying how message queues are allocated 
to each consumer clients.
      */
     private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
 
     /**
      * Subscription relationship
      */
-    private Map<String /* topic */, String /* sub expression */> subscription 
= new HashMap<String, String>();
+    private Map<String /* topic */, String /* sub expression */> subscription 
= new HashMap<>();
+
     /**
      * Message listener
      */
     private MessageListener messageListener;
+
     /**
      * Offset Storage
      */
     private OffsetStore offsetStore;
+
     /**
      * Minimum consumer thread number
      */
     private int consumeThreadMin = 20;
+
     /**
      * Max consumer thread number
      */
@@ -99,18 +164,22 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * Concurrently max span offset.it has no effect on sequential consumption
      */
     private int consumeConcurrentlyMaxSpan = 2000;
+
     /**
      * Flow control threshold
      */
     private int pullThresholdForQueue = 1000;
+
     /**
      * Message pull Interval
      */
     private long pullInterval = 0;
+
     /**
      * Batch consumption size
      */
     private int consumeMessageBatchMaxSize = 1;
+
     /**
      * Batch pull size
      */
@@ -126,24 +195,56 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     private boolean unitMode = false;
 
+    /**
+     * Max re-consume times. -1 means 16 times.
+     * </p>
+     *
+     * If messages are re-consumed more than {@link #maxReconsumeTimes} before 
success, it's be directed to a deletion
+     * queue waiting.
+     */
     private int maxReconsumeTimes = -1;
+
+    /**
+     * Suspending pulling time for cases requiring slow pulling like 
flow-control scenario.
+     */
     private long suspendCurrentQueueTimeMillis = 1000;
+
+    /**
+     * Maximum amount of time in minutes a message may block the consuming 
thread.
+     */
     private long consumeTimeout = 15;
 
+    /**
+     * Default constructor.
+     */
     public DefaultMQPushConsumer() {
         this(MixAll.DEFAULT_CONSUMER_GROUP, null, new 
AllocateMessageQueueAveragely());
     }
 
+    /**
+     * Constructor specifying consumer group, RPC hook and message queue 
allocating algorithm.
+     * @param consumerGroup Consume queue.
+     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param allocateMessageQueueStrategy message queue allocating algorithm.
+     */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, 
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.consumerGroup = consumerGroup;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
     }
 
+    /**
+     * Constructor specifying RPC hook.
+     * @param rpcHook RPC hook to execute before each remoting command.
+     */
     public DefaultMQPushConsumer(RPCHook rpcHook) {
         this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new 
AllocateMessageQueueAveragely());
     }
 
+    /**
+     * Constructor specifying consumer group.
+     * @param consumerGroup Consumer group.
+     */
     public DefaultMQPushConsumer(final String consumerGroup) {
         this(consumerGroup, null, new AllocateMessageQueueAveragely());
     }
@@ -308,12 +409,33 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this.subscription = subscription;
     }
 
+    /**
+     * Send message back to broker which will be re-delivered in future.
+     * @param msg Message to send back.
+     * @param delayLevel delay level.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException if there is any broker error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException if there is any client error.
+     */
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
     }
 
+    /**
+     * Send message back to the broker whose name is <code>brokerName</code> 
and the message will be re-delivered in
+     * future.
+     *
+     * @param msg Message to send back.
+     * @param delayLevel delay level.
+     * @param brokerName broker name.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException if there is any broker error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException if there is any client error.
+     */
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel, String 
brokerName)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
@@ -325,11 +447,18 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         return 
this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
     }
 
+    /**
+     * This method gets internal infrastructure readily to serve. Instances 
must call this method after configuration.
+     * @throws MQClientException if there is any client error.
+     */
     @Override
     public void start() throws MQClientException {
         this.defaultMQPushConsumerImpl.start();
     }
 
+    /**
+     * Shut down this client and releasing underlying resources.
+     */
     @Override
     public void shutdown() {
         this.defaultMQPushConsumerImpl.shutdown();
@@ -342,43 +471,83 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
     }
 
+    /**
+     * Register a callback to execute on message arrival for concurrent 
consuming.
+     *
+     * @param messageListener message handling callback.
+     */
     @Override
     public void registerMessageListener(MessageListenerConcurrently 
messageListener) {
         this.messageListener = messageListener;
         
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
     }
 
+    /**
+     * Register a callback to execute on message arrival for orderly consuming.
+     *
+     * @param messageListener message handling callback.
+     */
     @Override
     public void registerMessageListener(MessageListenerOrderly 
messageListener) {
         this.messageListener = messageListener;
         
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
     }
 
+    /**
+     * Subscribe a topic to consuming subscription.
+     *
+     * @param topic topic to subscribe.
+     * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br>
+     *     if null or * expression,meaning subscribe all
+     * @throws MQClientException if there is any client error.
+     */
     @Override
     public void subscribe(String topic, String subExpression) throws 
MQClientException {
         this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
     }
 
+    /**
+     * Subscribe a topic to consuming subscription.
+     * @param topic topic to consume.
+     * @param fullClassName full class name,must extend 
org.apache.rocketmq.common.filter. MessageFilter
+     * @param filterClassSource class source code,used UTF-8 file 
encoding,must be responsible for your code safety
+     * @throws MQClientException
+     */
     @Override
     public void subscribe(String topic, String fullClassName, String 
filterClassSource) throws MQClientException {
         this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, 
filterClassSource);
     }
 
+    /**
+     * Un-subscribe the specified topic from subscription.
+     * @param topic message topic
+     */
     @Override
     public void unsubscribe(String topic) {
         this.defaultMQPushConsumerImpl.unsubscribe(topic);
     }
 
+    /**
+     * Update the message consuming thread core pool size.
+     *
+     * @param corePoolSize new core pool size.
+     */
     @Override
     public void updateCorePoolSize(int corePoolSize) {
         this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
     }
 
+    /**
+     * Suspend pulling new messages.
+     */
     @Override
     public void suspend() {
         this.defaultMQPushConsumerImpl.suspend();
     }
 
+    /**
+     * Resume pulling.
+     */
     @Override
     public void resume() {
         this.defaultMQPushConsumerImpl.resume();

Reply via email to