http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index 3cc2fdf..9fd1b34 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -6,36 +6,32 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.remoting.RPCHook; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - - public class MQClientManager { private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = - new ConcurrentHashMap<String, MQClientInstance>(); - + new ConcurrentHashMap<String, MQClientInstance>(); private MQClientManager() { } - public static MQClientManager getInstance() { return instance; } @@ -49,8 +45,8 @@ public class MQClientManager { MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = - new MQClientInstance(clientConfig.cloneClientConfig(), - this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); + new MQClientInstance(clientConfig.cloneClientConfig(), + this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index e02bd4f..e7a6ca3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -16,6 +16,19 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -35,10 +48,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; -import java.util.*; -import java.util.concurrent.*; - - public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final Logger log = ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; @@ -51,9 +60,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService cleanExpireMsgExecutors; - public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, - MessageListenerConcurrently messageListener) { + MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; @@ -62,18 +70,17 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // - new ThreadFactoryImpl("ConsumeMessageThread_")); + this.defaultMQPushConsumer.getConsumeThreadMin(), // + this.defaultMQPushConsumer.getConsumeThreadMax(), // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.consumeRequestQueue, // + new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } - public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @@ -85,7 +92,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } - public void shutdown() { this.scheduledExecutorService.shutdown(); this.consumeExecutor.shutdown(); @@ -95,8 +101,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService @Override public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // - && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { + && corePoolSize <= Short.MAX_VALUE // + && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } } @@ -180,10 +186,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageConcurrentlyService.this.consumerGroup, // - msgs, // - mq), e); + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageConcurrentlyService.this.consumerGroup, // + msgs, // + mq), e); } result.setSpentTimeMills(System.currentTimeMillis() - beginTime); @@ -195,10 +201,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService @Override public void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final boolean dispatchToConsume) { + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); @@ -244,7 +250,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService private void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = - this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); + this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); @@ -253,9 +259,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } public void processConsumeResult(// - final ConsumeConcurrentlyStatus status, // - final ConsumeConcurrentlyContext context, // - final ConsumeRequest consumeRequest// + final ConsumeConcurrentlyStatus status, // + final ConsumeConcurrentlyContext context, // + final ConsumeRequest consumeRequest// ) { int ackIndex = context.getAckIndex(); @@ -275,7 +281,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), - consumeRequest.getMsgs().size()); + consumeRequest.getMsgs().size()); break; default: break; @@ -333,9 +339,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } private void submitConsumeRequestLater(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue// + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue// ) { this.scheduledExecutorService.schedule(new Runnable() { @@ -364,7 +370,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService private final ProcessQueue processQueue; private final MessageQueue messageQueue; - public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { this.msgs = msgs; this.processQueue = processQueue; @@ -414,10 +419,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageConcurrentlyService.this.consumerGroup, - msgs, - messageQueue); + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, + messageQueue); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; @@ -437,9 +442,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", - ConsumeMessageConcurrentlyService.this.consumerGroup, - msgs, - messageQueue); + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, + messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } @@ -450,7 +455,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() - .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); + .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); @@ -463,6 +468,5 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService return messageQueue; } - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index f6a1e4d..3def223 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -16,35 +16,42 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.stat.ConsumerStatsManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.*; - - public class ConsumeMessageOrderlyService implements ConsumeMessageService { private static final Logger log = ClientLogger.getLog(); private final static long MAX_TIME_CONSUME_CONTINUOUSLY = - Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000")); + Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000")); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerOrderly messageListener; @@ -55,7 +62,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { private final ScheduledExecutorService scheduledExecutorService; private volatile boolean stopped = false; - public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; @@ -65,17 +71,16 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // - new ThreadFactoryImpl("ConsumeMessageThread_")); + this.defaultMQPushConsumer.getConsumeThreadMin(), // + this.defaultMQPushConsumer.getConsumeThreadMax(), // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.consumeRequestQueue, // + new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); } - public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -87,7 +92,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } } - public void shutdown() { this.stopped = true; this.scheduledExecutorService.shutdown(); @@ -97,7 +101,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } } - public synchronized void unlockAllMQ() { this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false); } @@ -105,8 +108,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { @Override public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // - && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { + && corePoolSize <= Short.MAX_VALUE // + && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } } @@ -169,10 +172,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // - mq), e); + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageOrderlyService.this.consumerGroup, // + msgs, // + mq), e); } result.setAutoCommit(context.isAutoCommit()); @@ -185,10 +188,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { @Override public void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final boolean dispathToConsume) { + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); @@ -224,9 +227,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } private void submitConsumeRequestLater(// - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final long suspendTimeMillis// + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final long suspendTimeMillis// ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { @@ -249,10 +252,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } public boolean processConsumeResult(// - final List<MessageExt> msgs, // - final ConsumeOrderlyStatus status, // - final ConsumeOrderlyContext context, // - final ConsumeRequest consumeRequest// + final List<MessageExt> msgs, // + final ConsumeOrderlyStatus status, // + final ConsumeOrderlyContext context, // + final ConsumeRequest consumeRequest// ) { boolean continueConsume = true; long commitOffset = -1L; @@ -261,7 +264,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { case COMMIT: case ROLLBACK: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", - consumeRequest.getMessageQueue()); + consumeRequest.getMessageQueue()); case SUCCESS: commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); @@ -271,9 +274,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // - context.getSuspendCurrentQueueTimeMillis()); + consumeRequest.getProcessQueue(), // + consumeRequest.getMessageQueue(), // + context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); @@ -293,9 +296,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { case ROLLBACK: consumeRequest.getProcessQueue().rollback(); this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // - context.getSuspendCurrentQueueTimeMillis()); + consumeRequest.getProcessQueue(), // + consumeRequest.getMessageQueue(), // + context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: @@ -303,9 +306,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // - context.getSuspendCurrentQueueTimeMillis()); + consumeRequest.getProcessQueue(), // + consumeRequest.getMessageQueue(), // + context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; @@ -379,7 +382,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { private final ProcessQueue processQueue; private final MessageQueue messageQueue; - public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; @@ -403,7 +405,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) - || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { + || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { @@ -412,14 +414,14 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) - && !this.processQueue.isLocked()) { + && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) - && this.processQueue.isLockExpired()) { + && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; @@ -432,7 +434,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } final int consumeBatchSize = - ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); + ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { @@ -444,7 +446,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext - .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); + .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); @@ -460,29 +462,29 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", - this.messageQueue); + this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // - messageQueue); + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageOrderlyService.this.consumerGroup, // + msgs, // + messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } if (null == status // - || ConsumeOrderlyStatus.ROLLBACK == status// - || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { + || ConsumeOrderlyStatus.ROLLBACK == status// + || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // - messageQueue); + ConsumeMessageOrderlyService.this.consumerGroup, // + msgs, // + messageQueue); } long consumeRT = System.currentTimeMillis() - beginTimestamp; @@ -507,12 +509,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext - .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); + .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this.getConsumerStatsManager() - .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); + .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { @@ -530,7 +532,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } } - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java index 3dc768c..a59ab98 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java @@ -6,48 +6,39 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; +import java.util.List; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import java.util.List; - - public interface ConsumeMessageService { void start(); - void shutdown(); - void updateCorePoolSize(int corePoolSize); - void incCorePoolSize(); - void decCorePoolSize(); - int getCorePoolSize(); - ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); - void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final boolean dispathToConsume); + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final boolean dispathToConsume); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index f216533..7c1b4d6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -41,7 +48,11 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -52,10 +63,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - - public class DefaultMQPullConsumerImpl implements MQConsumerInner { private final Logger log = ClientLogger.getLog(); private final DefaultMQPullConsumer defaultMQPullConsumer; @@ -69,7 +76,6 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private OffsetStore offsetStore; private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); - public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { this.defaultMQPullConsumer = defaultMQPullConsumer; this.rpcHook = rpcHook; @@ -92,9 +98,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); } } @@ -146,17 +152,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); } public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); } private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); if (null == mq) { @@ -179,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { SubscriptionData subscriptionData; try { subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - mq.getTopic(), subExpression); + mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } @@ -187,17 +193,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.SYNC, // 10 - null// 11 + mq, // 1 + subscriptionData.getSubString(), // 2 + 0L, // 3 + offset, // 4 + maxNums, // 5 + sysFlag, // 6 + 0, // 7 + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 + timeoutMillis, // 9 + CommunicationMode.SYNC, // 10 + null// 11 ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); if (!this.consumeMessageHookList.isEmpty()) { @@ -219,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - topic, SubscriptionData.SUB_ALL); + topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); } catch (Exception e) { } @@ -357,23 +363,23 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); } public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); } private void pullAsyncImpl(// - final MessageQueue mq, // - final String subExpression, // - final long offset, // - final int maxNums, // - final PullCallback pullCallback, // - final boolean block, // - final long timeout) throws MQClientException, RemotingException, InterruptedException { + final MessageQueue mq, // + final String subExpression, // + final long offset, // + final int maxNums, // + final PullCallback pullCallback, // + final boolean block, // + final long timeout) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); if (null == mq) { @@ -400,7 +406,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { final SubscriptionData subscriptionData; try { subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - mq.getTopic(), subExpression); + mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } @@ -408,36 +414,36 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.ASYNC, // 10 - new PullCallback() { - - @Override - public void onSuccess(PullResult pullResult) { - pullCallback - .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData)); - } - - @Override - public void onException(Throwable e) { - pullCallback.onException(e); - } - }); + mq, // 1 + subscriptionData.getSubString(), // 2 + 0L, // 3 + offset, // 4 + maxNums, // 5 + sysFlag, // 6 + 0, // 7 + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 + timeoutMillis, // 9 + CommunicationMode.ASYNC, // 10 + new PullCallback() { + + @Override + public void onSuccess(PullResult pullResult) { + pullCallback + .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData)); + } + + @Override + public void onException(Throwable e) { + pullCallback.onException(e); + } + }); } catch (MQBrokerException e) { throw new MQClientException("pullAsync unknow exception", e); } } public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } @@ -446,19 +452,19 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) - throws MQClientException, RemotingException, InterruptedException { + throws MQClientException, RemotingException, InterruptedException { this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, - this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { this.makeSureStateOK(); return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } public MessageExt queryMessageByUniqKey(String topic, String uniqKey) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { this.makeSureStateOK(); return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); } @@ -469,27 +475,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); } public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException { + MQBrokerException, InterruptedException, MQClientException { this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) - : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); if (UtilAll.isBlank(consumerGroup)) { consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, - this.defaultMQPullConsumer.getMaxReconsumeTimes()); + this.defaultMQPullConsumer.getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e); @@ -545,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper(// - mQClientFactory, // - this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); + mQClientFactory, // + this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPullConsumer.getOffsetStore() != null) { @@ -571,8 +577,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); } mQClientFactory.start(); @@ -583,9 +589,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); default: break; } @@ -598,43 +604,43 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { // consumerGroup if (null == this.defaultMQPullConsumer.getConsumerGroup()) { throw new MQClientException( - "consumerGroup is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); + "consumerGroup is null" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); } // consumerGroup if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { throw new MQClientException( - "consumerGroup can not equal "// - + MixAll.DEFAULT_CONSUMER_GROUP // - + ", please specify another one."// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); + "consumerGroup can not equal "// + + MixAll.DEFAULT_CONSUMER_GROUP // + + ", please specify another one."// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); } // messageModel if (null == this.defaultMQPullConsumer.getMessageModel()) { throw new MQClientException( - "messageModel is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); + "messageModel is null" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); } // allocateMessageQueueStrategy if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) { throw new MQClientException( - "allocateMessageQueueStrategy is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); + "allocateMessageQueueStrategy is null" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); } // allocateMessageQueueStrategy if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) { throw new MQClientException( - "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // - null); + "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); } } @@ -644,7 +650,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { if (registerTopics != null) { for (final String topic : registerTopics) { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // - topic, SubscriptionData.SUB_ALL); + topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } @@ -696,7 +702,6 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { return consumerStartTimestamp; } - public RebalanceImpl getRebalanceImpl() { return rebalanceImpl; }
