This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c159a67d [ISSUE #1150] [Java] Support OffsetOption for subscribeLite
(#1151)
c159a67d is described below
commit c159a67d62c175027a366774940843c04e96a8ab
Author: Quan <[email protected]>
AuthorDate: Wed Jan 14 11:03:34 2026 +0800
[ISSUE #1150] [Java] Support OffsetOption for subscribeLite (#1151)
Change-Id: Icca22d58883c467e74c357e4f7d7aaa719f2e524
Co-authored-by: 靖泉 <[email protected]>
---
.../client/apis/consumer/LitePushConsumer.java | 10 +-
.../client/apis/consumer/OffsetOption.java | 101 +++++++++++++++++++++
.../java/impl/consumer/LitePushConsumerImpl.java | 62 +++++++++++--
.../impl/consumer/LitePushConsumerImplTest.java | 86 +++++++++++++++++-
4 files changed, 246 insertions(+), 13 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
index 2cf276b3..b35428c9 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
@@ -35,11 +35,19 @@ public interface LitePushConsumer extends Closeable {
* evaluate whether the quota is insufficient and promptly unsubscribe
from unused subscriptions
* using unsubscribeLite() to free up resources.
*
- * @param liteTopic the name of the lite topic to subscribe to
+ * @param liteTopic the name of the lite topic to subscribe
* @throws ClientException if an error occurs during subscription
*/
void subscribeLite(String liteTopic) throws ClientException;
+ /**
+ * Subscribe to a lite topic with consumeFromOption to specify the
consume from offset.
+ * @param liteTopic the name of the lite topic to subscribe
+ * @param offsetOption the consume from offset
+ * @throws ClientException if an error occurs during subscription
+ */
+ void subscribeLite(String liteTopic, OffsetOption offsetOption) throws
ClientException;
+
/**
* Unsubscribe from a lite topic.
*
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/OffsetOption.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/OffsetOption.java
new file mode 100644
index 00000000..9e409447
--- /dev/null
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/OffsetOption.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.apis.consumer;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+
+public class OffsetOption {
+
+ public static final long POLICY_LAST_VALUE = 0L;
+ public static final long POLICY_MIN_VALUE = 1L;
+ public static final long POLICY_MAX_VALUE = 2L;
+
+ public static final OffsetOption LAST_OFFSET = new
OffsetOption(Type.POLICY, POLICY_LAST_VALUE);
+ public static final OffsetOption MIN_OFFSET = new
OffsetOption(Type.POLICY, POLICY_MIN_VALUE);
+ public static final OffsetOption MAX_OFFSET = new
OffsetOption(Type.POLICY, POLICY_MAX_VALUE);
+
+ private final Type type;
+ private final long value;
+
+ private OffsetOption(Type type, long value) {
+ this.type = type;
+ this.value = value;
+ }
+
+ public static OffsetOption ofOffset(long offset) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("offset must be greater than or
equal to 0");
+ }
+ return new OffsetOption(Type.OFFSET, offset);
+ }
+
+ public static OffsetOption ofTailN(long tailN) {
+ if (tailN < 0) {
+ throw new IllegalArgumentException("tailN must be greater than or
equal to 0");
+ }
+ return new OffsetOption(Type.TAIL_N, tailN);
+ }
+
+ public static OffsetOption ofTimestamp(long timestamp) {
+ if (timestamp < 0) {
+ throw new IllegalArgumentException("timestamp must be greater than
or equal to 0");
+ }
+ return new OffsetOption(Type.TIMESTAMP, timestamp);
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OffsetOption option = (OffsetOption) o;
+ return value == option.value && type == option.type;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hashCode(type);
+ result = 31 * result + Long.hashCode(value);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("value", value)
+ .toString();
+ }
+
+ public enum Type {
+ POLICY,
+ OFFSET,
+ TAIL_N,
+ TIMESTAMP
+ }
+
+}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
index 30429a8f..f5a049a8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import org.apache.rocketmq.client.apis.consumer.OffsetOption;
import
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -83,6 +84,11 @@ public class LitePushConsumerImpl extends PushConsumerImpl
implements LitePushCo
@Override
public void subscribeLite(String liteTopic) throws ClientException {
+ subscribeLite(liteTopic, null);
+ }
+
+ @Override
+ public void subscribeLite(String liteTopic, OffsetOption offsetOption)
throws ClientException {
checkRunning();
if (litePushConsumerSettings.containsLiteTopic(liteTopic)) {
return;
@@ -90,7 +96,8 @@ public class LitePushConsumerImpl extends PushConsumerImpl
implements LitePushCo
validateLiteTopic(liteTopic);
checkLiteSubscriptionQuota(1);
ListenableFuture<Void> future =
- syncLiteSubscription(LiteSubscriptionAction.PARTIAL_ADD,
Collections.singleton(liteTopic));
+ syncLiteSubscription(LiteSubscriptionAction.PARTIAL_ADD,
+ Collections.singleton(liteTopic), offsetOption);
try {
handleClientFuture(future);
} catch (ClientException e) {
@@ -128,7 +135,8 @@ public class LitePushConsumerImpl extends PushConsumerImpl
implements LitePushCo
return;
}
ListenableFuture<Void> future =
- syncLiteSubscription(LiteSubscriptionAction.PARTIAL_REMOVE,
Collections.singleton(liteTopic));
+ syncLiteSubscription(LiteSubscriptionAction.PARTIAL_REMOVE,
+ Collections.singleton(liteTopic), null);
try {
handleClientFuture(future);
} catch (ClientException e) {
@@ -149,19 +157,57 @@ public class LitePushConsumerImpl extends
PushConsumerImpl implements LitePushCo
protected void syncAllLiteSubscription() throws ClientException {
checkLiteSubscriptionQuota(0);
final Set<String> set = litePushConsumerSettings.getLiteTopicSet();
- ListenableFuture<Void> future =
syncLiteSubscription(LiteSubscriptionAction.COMPLETE_ADD, set);
+ ListenableFuture<Void> future =
syncLiteSubscription(LiteSubscriptionAction.COMPLETE_ADD, set, null);
handleClientFuture(future);
}
- protected ListenableFuture<Void>
syncLiteSubscription(LiteSubscriptionAction action, Collection<String> diff) {
- SyncLiteSubscriptionRequest request =
SyncLiteSubscriptionRequest.newBuilder()
+ protected ListenableFuture<Void> syncLiteSubscription(
+ LiteSubscriptionAction action,
+ Collection<String> diff,
+ OffsetOption offsetOption
+ ) {
+ SyncLiteSubscriptionRequest.Builder builder =
SyncLiteSubscriptionRequest.newBuilder()
.setAction(action)
.setTopic(litePushConsumerSettings.bindTopic.toProtobuf())
.setGroup(litePushConsumerSettings.group.toProtobuf())
- .addAllLiteTopicSet(diff)
- .build();
+ .addAllLiteTopicSet(diff);
+ if (offsetOption != null) {
+ builder.setOffsetOption(toProtobufOffsetOption(offsetOption));
+ }
Endpoints endpoints = getEndpoints();
- return syncLiteSubscription0(endpoints, request);
+ return syncLiteSubscription0(endpoints, builder.build());
+ }
+
+ protected apache.rocketmq.v2.OffsetOption
toProtobufOffsetOption(OffsetOption offsetOption) {
+ apache.rocketmq.v2.OffsetOption.Builder protoBuilder =
apache.rocketmq.v2.OffsetOption.newBuilder();
+ switch (offsetOption.getType()) {
+ case POLICY:
+
protoBuilder.setPolicy(toProtobufPolicy(offsetOption.getValue()));
+ break;
+ case OFFSET:
+ protoBuilder.setOffset(offsetOption.getValue());
+ break;
+ case TAIL_N:
+ protoBuilder.setTailN(offsetOption.getValue());
+ break;
+ case TIMESTAMP:
+ protoBuilder.setTimestamp(offsetOption.getValue());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown OffsetOption type:
" + offsetOption.getType());
+ }
+ return protoBuilder.build();
+ }
+
+ protected apache.rocketmq.v2.OffsetOption.Policy toProtobufPolicy(long
policyValue) {
+ if (policyValue == OffsetOption.POLICY_LAST_VALUE) {
+ return apache.rocketmq.v2.OffsetOption.Policy.LAST;
+ } else if (policyValue == OffsetOption.POLICY_MIN_VALUE) {
+ return apache.rocketmq.v2.OffsetOption.Policy.MIN;
+ } else if (policyValue == OffsetOption.POLICY_MAX_VALUE) {
+ return apache.rocketmq.v2.OffsetOption.Policy.MAX;
+ }
+ throw new IllegalArgumentException("Unknown policy type: " +
policyValue);
}
protected ListenableFuture<Void> syncLiteSubscription0(Endpoints
endpoints, SyncLiteSubscriptionRequest request) {
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
index b71ab166..a6232161 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
@@ -35,14 +35,17 @@ import apache.rocketmq.v2.LiteSubscriptionAction;
import com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.OffsetOption;
import
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
+@RunWith(MockitoJUnitRunner.class)
public class LitePushConsumerImplTest {
final String endpoints = "127.0.0.1:8080";
@@ -63,7 +66,6 @@ public class LitePushConsumerImplTest {
spySettings = Mockito.spy(realSettings);
- MockitoAnnotations.openMocks(this);
consumer = mock(LitePushConsumerImpl.class, CALLS_REAL_METHODS);
// Set final field litePushConsumerSettings using reflection
try {
@@ -93,7 +95,7 @@ public class LitePushConsumerImplTest {
verify(consumer).checkRunning();
verify(spySettings).containsLiteTopic(liteTopic);
- verify(consumer, never()).syncLiteSubscription(any(), any());
+ verify(consumer, never()).syncLiteSubscription(any(), any(), any());
}
@Test
@@ -102,7 +104,7 @@ public class LitePushConsumerImplTest {
String liteTopic2 = "testLiteTopic2";
doNothing().when(consumer).checkRunning();
doReturn(Futures.immediateVoidFuture()).when(consumer)
- .syncLiteSubscription(any(LiteSubscriptionAction.class),
anyCollection());
+ .syncLiteSubscription(any(LiteSubscriptionAction.class),
anyCollection(), any());
when(spySettings.getLiteSubscriptionQuota()).thenReturn(1);
consumer.subscribeLite(liteTopic1);
@@ -122,4 +124,80 @@ public class LitePushConsumerImplTest {
verify(spySettings, times(1)).removeLiteTopic(liteTopic1);
verify(spySettings, times(1)).addLiteTopic(liteTopic2);
}
+
+ @Test
+ public void testToProtobufOffsetOptionWithPolicy() {
+ OffsetOption offsetOption = OffsetOption.LAST_OFFSET;
+ apache.rocketmq.v2.OffsetOption protobufOffsetOption =
consumer.toProtobufOffsetOption(offsetOption);
+ assertThat(protobufOffsetOption.hasPolicy()).isTrue();
+
assertThat(protobufOffsetOption.getPolicy()).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.LAST);
+ assertThat(protobufOffsetOption.hasOffset()).isFalse();
+ assertThat(protobufOffsetOption.hasTailN()).isFalse();
+ assertThat(protobufOffsetOption.hasTimestamp()).isFalse();
+ }
+
+ @Test
+ public void testToProtobufOffsetOptionWithOffset() {
+ long offsetValue = 100L;
+ OffsetOption offsetOption = OffsetOption.ofOffset(offsetValue);
+ apache.rocketmq.v2.OffsetOption protobufOffsetOption =
consumer.toProtobufOffsetOption(offsetOption);
+ assertThat(protobufOffsetOption.hasOffset()).isTrue();
+ assertThat(protobufOffsetOption.getOffset()).isEqualTo(offsetValue);
+ assertThat(protobufOffsetOption.hasPolicy()).isFalse();
+ assertThat(protobufOffsetOption.hasTailN()).isFalse();
+ assertThat(protobufOffsetOption.hasTimestamp()).isFalse();
+ }
+
+ @Test
+ public void testToProtobufOffsetOptionWithTailN() {
+ long tailNValue = 5L;
+ OffsetOption offsetOption = OffsetOption.ofTailN(tailNValue);
+ apache.rocketmq.v2.OffsetOption protobufOffsetOption =
consumer.toProtobufOffsetOption(offsetOption);
+ assertThat(protobufOffsetOption.hasTailN()).isTrue();
+ assertThat(protobufOffsetOption.getTailN()).isEqualTo(tailNValue);
+ assertThat(protobufOffsetOption.hasPolicy()).isFalse();
+ assertThat(protobufOffsetOption.hasOffset()).isFalse();
+ assertThat(protobufOffsetOption.hasTimestamp()).isFalse();
+ }
+
+ @Test
+ public void testToProtobufOffsetOptionWithTimestamp() {
+ long timestampValue = System.currentTimeMillis();
+ OffsetOption offsetOption = OffsetOption.ofTimestamp(timestampValue);
+ apache.rocketmq.v2.OffsetOption protobufOffsetOption =
consumer.toProtobufOffsetOption(offsetOption);
+ assertThat(protobufOffsetOption.hasTimestamp()).isTrue();
+
assertThat(protobufOffsetOption.getTimestamp()).isEqualTo(timestampValue);
+ assertThat(protobufOffsetOption.hasPolicy()).isFalse();
+ assertThat(protobufOffsetOption.hasOffset()).isFalse();
+ assertThat(protobufOffsetOption.hasTailN()).isFalse();
+ }
+
+ @Test
+ public void testToProtobufPolicyWithLast() {
+ long policyValue = OffsetOption.POLICY_LAST_VALUE;
+ apache.rocketmq.v2.OffsetOption.Policy policy =
consumer.toProtobufPolicy(policyValue);
+
assertThat(policy).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.LAST);
+ }
+
+ @Test
+ public void testToProtobufPolicyWithMin() {
+ long policyValue = OffsetOption.POLICY_MIN_VALUE;
+ apache.rocketmq.v2.OffsetOption.Policy policy =
consumer.toProtobufPolicy(policyValue);
+
assertThat(policy).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.MIN);
+ }
+
+ @Test
+ public void testToProtobufPolicyWithMax() {
+ long policyValue = OffsetOption.POLICY_MAX_VALUE;
+ apache.rocketmq.v2.OffsetOption.Policy policy =
consumer.toProtobufPolicy(policyValue);
+
assertThat(policy).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.MAX);
+ }
+
+ @Test
+ public void testToProtobufPolicyWithUnknownValue() {
+ long unknownPolicyValue = 999L;
+ assertThatThrownBy(() -> consumer.toProtobufPolicy(unknownPolicyValue))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unknown policy type");
+ }
}