This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch test-release in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a1a92ce9951cad793fff5ca0fbd8c56c82aa3bee Author: Heng Du <[email protected]> AuthorDate: Fri Dec 27 20:11:43 2019 +0800 feat(pullconsumer) add pull sys flag (#1658) feat(lite_pull_consumer) add support for consume from where test(pull_consumer) add unit test for compute from where test(PullSysFlag) add lite pull sys flag unit test feat(pull_consumer) add the verification of ConsumerFromWhere value --- .../client/consumer/DefaultLitePullConsumer.java | 31 ++++++++++ .../client/impl/consumer/AssignedMessageQueue.java | 2 +- .../impl/consumer/DefaultLitePullConsumerImpl.java | 17 +++--- .../client/impl/consumer/RebalanceImpl.java | 5 -- .../impl/consumer/RebalanceLitePullImpl.java | 71 ++++++++++++++++++++-- .../consumer/DefaultLitePullConsumerTest.java | 45 ++++++++++++++ .../rocketmq/common/sysflag/PullSysFlag.java | 16 +++++ .../rocketmq/common/sysflag/PullSysFlagTest.java | 32 +++++----- .../example/simple/LitePullConsumerSubscribe.java | 4 +- 9 files changed, 184 insertions(+), 39 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 52060ea..782d29b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -24,6 +24,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; @@ -138,6 +140,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon */ private long topicMetadataCheckIntervalMillis = 30 * 1000; + private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + + /** + * Backtracking consumption time with second precision. Time format is 20131223171201<br> Implying Seventeen twelve + * and 01 seconds on December 23, 2013 year<br> Default backtracking consumption time Half an hour ago. + */ + private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + /** * Default constructor. */ @@ -431,4 +441,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET + && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET + && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) { + throw new RuntimeException("Invalid ConsumeFromWhere Value", null); + } + this.consumeFromWhere = consumeFromWhere; + } + + public String getConsumeTimestamp() { + return consumeTimestamp; + } + + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index c0c6f60..0b090e3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -90,7 +90,7 @@ public class AssignedMessageQueue { } } - public long getConusmerOffset(MessageQueue messageQueue) { + public long getConsumerOffset(MessageQueue messageQueue) { MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); if (messageQueueState != null) { return messageQueueState.getConsumeOffset(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index f44eea7..cd4d4cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -597,7 +597,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void commitSync() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue); + long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); if (consumerOffset != -1) { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); @@ -618,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private synchronized void commitAll() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue); + long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); if (consumerOffset != -1) { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); @@ -650,9 +650,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) { + private long fetchConsumeOffset(MessageQueue messageQueue) { checkServiceState(); - return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); + long offset = this.rebalanceImpl.computePullFromWhere(messageQueue); + return offset; } public long committed(MessageQueue messageQueue) throws MQClientException { @@ -685,10 +686,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } else { offset = assignedMessageQueue.getPullOffset(messageQueue); if (offset == -1) { - offset = fetchConsumeOffset(messageQueue, false); - if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { - offset = 0; - } + offset = fetchConsumeOffset(messageQueue); } } return offset; @@ -779,7 +777,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize()); - switch (pullResult.getPullStatus()) { case FOUND: final Object objLock = messageQueueLock.fetchLockObject(messageQueue); @@ -850,7 +847,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { throw new MQClientException("maxNums <= 0", null); } - int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); + int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true); long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 146fce6..b8972a9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -40,11 +40,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -/** - * This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use - * in the scenario of actively pulling messages. - */ -@Deprecated public abstract class RebalanceImpl { protected static final InternalLogger log = ClientLogger.getLog(); protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 0b8ec67..9d1ea74 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -16,16 +16,20 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.List; +import java.util.Set; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; -import java.util.Set; - public class RebalanceLitePullImpl extends RebalanceImpl { private final DefaultLitePullConsumerImpl litePullConsumerImpl; @@ -72,7 +76,66 @@ public class RebalanceLitePullImpl extends RebalanceImpl { @Override public long computePullFromWhere(MessageQueue mq) { - return 0; + ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere(); + long result = -1; + switch (consumeFromWhere) { + case CONSUME_FROM_LAST_OFFSET: { + long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (lastOffset >= 0) { + result = lastOffset; + } else if (-1 == lastOffset) { + if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // First start, no offset + result = 0L; + } else { + try { + result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } catch (MQClientException e) { + result = -1; + } + } + } else { + result = -1; + } + break; + } + case CONSUME_FROM_FIRST_OFFSET: { + long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (lastOffset >= 0) { + result = lastOffset; + } else if (-1 == lastOffset) { + result = 0L; + } else { + result = -1; + } + break; + } + case CONSUME_FROM_TIMESTAMP: { + long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (lastOffset >= 0) { + result = lastOffset; + } else if (-1 == lastOffset) { + if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + try { + result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } catch (MQClientException e) { + result = -1; + } + } else { + try { + long timestamp = UtilAll.parseDate(this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeTimestamp(), + UtilAll.YYYYMMDDHHMMSS).getTime(); + result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } catch (MQClientException e) { + result = -1; + } + } + } else { + result = -1; + } + break; + } + } + return result; } @Override diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index d2cb057..cc8d5e2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.consumer.RebalanceService; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -419,6 +420,50 @@ public class DefaultLitePullConsumerTest { } + @Test + public void testComputePullFromWhereReturnedNotFound() throws Exception{ + DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); + defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + MessageQueue messageQueue = createMessageQueue(); + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + long offset = rebalanceImpl.computePullFromWhere(messageQueue); + assertThat(offset).isEqualTo(0); + } + + @Test + public void testComputePullFromWhereReturned() throws Exception{ + DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); + defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + MessageQueue messageQueue = createMessageQueue(); + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(100L); + long offset = rebalanceImpl.computePullFromWhere(messageQueue); + assertThat(offset).isEqualTo(100); + } + + + @Test + public void testComputePullFromLast() throws Exception{ + DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); + defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + MessageQueue messageQueue = createMessageQueue(); + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L); + long offset = rebalanceImpl.computePullFromWhere(messageQueue); + assertThat(offset).isEqualTo(100); + } + + @Test + public void testComputePullByTimeStamp() throws Exception{ + DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer(); + defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); + defaultLitePullConsumer.setConsumeTimestamp("20191024171201"); + MessageQueue messageQueue = createMessageQueue(); + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L); + long offset = rebalanceImpl.computePullFromWhere(messageQueue); + assertThat(offset).isEqualTo(100); + } + private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java index d476a35..ce7558f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java @@ -21,6 +21,7 @@ public class PullSysFlag { private final static int FLAG_SUSPEND = 0x1 << 1; private final static int FLAG_SUBSCRIPTION = 0x1 << 2; private final static int FLAG_CLASS_FILTER = 0x1 << 3; + private final static int FLAG_LITE_PULL_MESSAGE = 0x1 << 4; public static int buildSysFlag(final boolean commitOffset, final boolean suspend, final boolean subscription, final boolean classFilter) { @@ -45,6 +46,17 @@ public class PullSysFlag { return flag; } + public static int buildSysFlag(final boolean commitOffset, final boolean suspend, + final boolean subscription, final boolean classFilter, final boolean litePull) { + int flag = buildSysFlag(commitOffset, suspend, subscription, classFilter); + + if (litePull) { + flag |= FLAG_LITE_PULL_MESSAGE; + } + + return flag; + } + public static int clearCommitOffsetFlag(final int sysFlag) { return sysFlag & (~FLAG_COMMIT_OFFSET); } @@ -64,4 +76,8 @@ public class PullSysFlag { public static boolean hasClassFilterFlag(final int sysFlag) { return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER; } + + public static boolean hasLitePullFlag(final int sysFlag) { + return (sysFlag & FLAG_LITE_PULL_MESSAGE) == FLAG_LITE_PULL_MESSAGE; + } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java similarity index 50% copy from example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java copy to common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java index 1bfe49d..60e1812 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java +++ b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java @@ -14,27 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.example.simple; +package org.apache.rocketmq.common.sysflag; -import java.util.List; -import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Test; -public class LitePullConsumerSubscribe { +import static org.assertj.core.api.Assertions.assertThat; - public static volatile boolean running = true; +public class PullSysFlagTest { - public static void main(String[] args) throws Exception { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); - litePullConsumer.subscribe("TopicTest", "*"); - litePullConsumer.start(); - try { - while (running) { - List<MessageExt> messageExts = litePullConsumer.poll(); - System.out.printf("%s%n", messageExts); - } - } finally { - litePullConsumer.shutdown(); - } + @Test + public void testLitePullFlag() { + int flag = PullSysFlag.buildSysFlag(false, false, false, false, true); + assertThat(PullSysFlag.hasLitePullFlag(flag)).isTrue(); + } + + @Test + public void testLitePullFlagFalse() { + int flag = PullSysFlag.buildSysFlag(false, false, false, false, false); + assertThat(PullSysFlag.hasLitePullFlag(flag)).isFalse(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java index 1bfe49d..e5c1a61 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.example.simple; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class LitePullConsumerSubscribe { @@ -25,7 +26,8 @@ public class LitePullConsumerSubscribe { public static volatile boolean running = true; public static void main(String[] args) throws Exception { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); + litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try {
