This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f1b856a feat(pullconsumer) add pull sys flag (#1658)
f1b856a is described below
commit f1b856a091a3ee441fb0fe0b5a1d5892b7fafad0
Author: Heng Du <[email protected]>
AuthorDate: Fri Dec 27 20:11:43 2019 +0800
feat(pullconsumer) add pull sys flag (#1658)
feat(lite_pull_consumer) add support for consume from where
test(pull_consumer) add unit test for compute from where
test(PullSysFlag) add lite pull sys flag unit test
feat(pull_consumer) add the verification of ConsumerFromWhere value
---
.../client/consumer/DefaultLitePullConsumer.java | 31 ++++++++++
.../client/impl/consumer/AssignedMessageQueue.java | 2 +-
.../impl/consumer/DefaultLitePullConsumerImpl.java | 17 +++---
.../client/impl/consumer/RebalanceImpl.java | 5 --
.../impl/consumer/RebalanceLitePullImpl.java | 71 ++++++++++++++++++++--
.../consumer/DefaultLitePullConsumerTest.java | 45 ++++++++++++++
.../rocketmq/common/sysflag/PullSysFlag.java | 16 +++++
.../rocketmq/common/sysflag/PullSysFlagTest.java | 32 +++++-----
.../example/simple/LitePullConsumerSubscribe.java | 4 +-
9 files changed, 184 insertions(+), 39 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 52060ea..782d29b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -24,6 +24,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
@@ -138,6 +140,14 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
+ private ConsumeFromWhere consumeFromWhere =
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+
+ /**
+ * Backtracking consumption time with second precision. Time format is
20131223171201<br> Implying Seventeen twelve
+ * and 01 seconds on December 23, 2013 year<br> Default backtracking
consumption time Half an hour ago.
+ */
+ private String consumeTimestamp =
UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+
/**
* Default constructor.
*/
@@ -431,4 +441,25 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
+ && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
+ && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+ throw new RuntimeException("Invalid ConsumeFromWhere Value", null);
+ }
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+ public String getConsumeTimestamp() {
+ return consumeTimestamp;
+ }
+
+ public void setConsumeTimestamp(String consumeTimestamp) {
+ this.consumeTimestamp = consumeTimestamp;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index c0c6f60..0b090e3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -90,7 +90,7 @@ public class AssignedMessageQueue {
}
}
- public long getConusmerOffset(MessageQueue messageQueue) {
+ public long getConsumerOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState =
assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getConsumeOffset();
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f44eea7..cd4d4cf 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -597,7 +597,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
public synchronized void commitSync() {
try {
for (MessageQueue messageQueue :
assignedMessageQueue.messageQueues()) {
- long consumerOffset =
assignedMessageQueue.getConusmerOffset(messageQueue);
+ long consumerOffset =
assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset =
this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
@@ -618,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private synchronized void commitAll() {
try {
for (MessageQueue messageQueue :
assignedMessageQueue.messageQueues()) {
- long consumerOffset =
assignedMessageQueue.getConusmerOffset(messageQueue);
+ long consumerOffset =
assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset =
this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
@@ -650,9 +650,10 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
- private long fetchConsumeOffset(MessageQueue messageQueue, boolean
fromStore) {
+ private long fetchConsumeOffset(MessageQueue messageQueue) {
checkServiceState();
- return this.offsetStore.readOffset(messageQueue, fromStore ?
ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ long offset = this.rebalanceImpl.computePullFromWhere(messageQueue);
+ return offset;
}
public long committed(MessageQueue messageQueue) throws MQClientException {
@@ -685,10 +686,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
} else {
offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) {
- offset = fetchConsumeOffset(messageQueue, false);
- if (offset == -1 && defaultLitePullConsumer.getMessageModel()
== MessageModel.BROADCASTING) {
- offset = 0;
- }
+ offset = fetchConsumeOffset(messageQueue);
}
}
return offset;
@@ -779,7 +777,6 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
PullResult pullResult = pull(messageQueue,
subscriptionData, offset, nextPullBatchSize());
-
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock =
messageQueueLock.fetchLockObject(messageQueue);
@@ -850,7 +847,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
throw new MQClientException("maxNums <= 0", null);
}
- int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+ int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false,
true);
long timeoutMillis = block ?
this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 146fce6..b8972a9 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -40,11 +40,6 @@ import
org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-/**
- * This class will be removed in 2022, and a better implementation {@link
RebalanceLitePullImpl} is recommend to use
- * in the scenario of actively pulling messages.
- */
-@Deprecated
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue>
processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 0b8ec67..9d1ea74 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -16,16 +16,20 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import java.util.List;
-import java.util.Set;
-
public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
@@ -72,7 +76,66 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
@Override
public long computePullFromWhere(MessageQueue mq) {
- return 0;
+ ConsumeFromWhere consumeFromWhere =
litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
+ long result = -1;
+ switch (consumeFromWhere) {
+ case CONSUME_FROM_LAST_OFFSET: {
+ long lastOffset =
litePullConsumerImpl.getOffsetStore().readOffset(mq,
ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ if
(mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // First start,
no offset
+ result = 0L;
+ } else {
+ try {
+ result =
this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ }
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_FIRST_OFFSET: {
+ long lastOffset =
litePullConsumerImpl.getOffsetStore().readOffset(mq,
ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ result = 0L;
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_TIMESTAMP: {
+ long lastOffset =
litePullConsumerImpl.getOffsetStore().readOffset(mq,
ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ if
(mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ try {
+ result =
this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ } else {
+ try {
+ long timestamp =
UtilAll.parseDate(this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeTimestamp(),
+ UtilAll.YYYYMMDDHHMMSS).getTime();
+ result =
this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ }
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ }
+ return result;
}
@Override
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index d2cb057..cc8d5e2 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -419,6 +420,50 @@ public class DefaultLitePullConsumerTest {
}
+ @Test
+ public void testComputePullFromWhereReturnedNotFound() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer =
createStartLitePullConsumer();
+
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(0);
+ }
+
+ @Test
+ public void testComputePullFromWhereReturned() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer =
createStartLitePullConsumer();
+
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ }
+
+
+ @Test
+ public void testComputePullFromLast() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer =
createStartLitePullConsumer();
+
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
+
when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ }
+
+ @Test
+ public void testComputePullByTimeStamp() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer =
createStartLitePullConsumer();
+
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+ defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
+
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ }
+
private void initDefaultLitePullConsumer(DefaultLitePullConsumer
litePullConsumer) throws Exception {
Field field =
DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
diff --git
a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index d476a35..ce7558f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -21,6 +21,7 @@ public class PullSysFlag {
private final static int FLAG_SUSPEND = 0x1 << 1;
private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+ private final static int FLAG_LITE_PULL_MESSAGE = 0x1 << 4;
public static int buildSysFlag(final boolean commitOffset, final boolean
suspend,
final boolean subscription, final boolean classFilter) {
@@ -45,6 +46,17 @@ public class PullSysFlag {
return flag;
}
+ public static int buildSysFlag(final boolean commitOffset, final boolean
suspend,
+ final boolean subscription, final boolean classFilter, final boolean
litePull) {
+ int flag = buildSysFlag(commitOffset, suspend, subscription,
classFilter);
+
+ if (litePull) {
+ flag |= FLAG_LITE_PULL_MESSAGE;
+ }
+
+ return flag;
+ }
+
public static int clearCommitOffsetFlag(final int sysFlag) {
return sysFlag & (~FLAG_COMMIT_OFFSET);
}
@@ -64,4 +76,8 @@ public class PullSysFlag {
public static boolean hasClassFilterFlag(final int sysFlag) {
return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
}
+
+ public static boolean hasLitePullFlag(final int sysFlag) {
+ return (sysFlag & FLAG_LITE_PULL_MESSAGE) == FLAG_LITE_PULL_MESSAGE;
+ }
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
similarity index 50%
copy from
example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
copy to
common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
index 1bfe49d..60e1812 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
+++
b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
@@ -14,27 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.common.sysflag;
-import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Test;
-public class LitePullConsumerSubscribe {
+import static org.assertj.core.api.Assertions.assertThat;
- public static volatile boolean running = true;
+public class PullSysFlagTest {
- public static void main(String[] args) throws Exception {
- DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer("please_rename_unique_group_name");
- litePullConsumer.subscribe("TopicTest", "*");
- litePullConsumer.start();
- try {
- while (running) {
- List<MessageExt> messageExts = litePullConsumer.poll();
- System.out.printf("%s%n", messageExts);
- }
- } finally {
- litePullConsumer.shutdown();
- }
+ @Test
+ public void testLitePullFlag() {
+ int flag = PullSysFlag.buildSysFlag(false, false, false, false, true);
+ assertThat(PullSysFlag.hasLitePullFlag(flag)).isTrue();
+ }
+
+ @Test
+ public void testLitePullFlagFalse() {
+ int flag = PullSysFlag.buildSysFlag(false, false, false, false, false);
+ assertThat(PullSysFlag.hasLitePullFlag(flag)).isFalse();
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
index 1bfe49d..e5c1a61 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
+++
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class LitePullConsumerSubscribe {
@@ -25,7 +26,8 @@ public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
- DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer("please_rename_unique_group_name");
+ DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer("lite_pull_consumer_test");
+
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {