This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c159a67d [ISSUE #1150] [Java] Support OffsetOption for subscribeLite 
(#1151)
c159a67d is described below

commit c159a67d62c175027a366774940843c04e96a8ab
Author: Quan <[email protected]>
AuthorDate: Wed Jan 14 11:03:34 2026 +0800

    [ISSUE #1150] [Java] Support OffsetOption for subscribeLite (#1151)
    
    Change-Id: Icca22d58883c467e74c357e4f7d7aaa719f2e524
    Co-authored-by: 靖泉 <[email protected]>
---
 .../client/apis/consumer/LitePushConsumer.java     |  10 +-
 .../client/apis/consumer/OffsetOption.java         | 101 +++++++++++++++++++++
 .../java/impl/consumer/LitePushConsumerImpl.java   |  62 +++++++++++--
 .../impl/consumer/LitePushConsumerImplTest.java    |  86 +++++++++++++++++-
 4 files changed, 246 insertions(+), 13 deletions(-)

diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
index 2cf276b3..b35428c9 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
@@ -35,11 +35,19 @@ public interface LitePushConsumer extends Closeable {
      *    evaluate whether the quota is insufficient and promptly unsubscribe 
from unused subscriptions
      *    using unsubscribeLite() to free up resources.
      *
-     * @param liteTopic the name of the lite topic to subscribe to
+     * @param liteTopic the name of the lite topic to subscribe
      * @throws ClientException if an error occurs during subscription
      */
     void subscribeLite(String liteTopic) throws ClientException;
 
+    /**
+     *  Subscribe to a lite topic with consumeFromOption to specify the 
consume from offset.
+     * @param liteTopic the name of the lite topic to subscribe
+     * @param offsetOption the consume from offset
+     * @throws ClientException if an error occurs during subscription
+     */
+    void subscribeLite(String liteTopic, OffsetOption offsetOption) throws 
ClientException;
+
     /**
      * Unsubscribe from a lite topic.
      *
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/OffsetOption.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/OffsetOption.java
new file mode 100644
index 00000000..9e409447
--- /dev/null
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/OffsetOption.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.apis.consumer;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+
+public class OffsetOption {
+
+    public static final long POLICY_LAST_VALUE = 0L;
+    public static final long POLICY_MIN_VALUE = 1L;
+    public static final long POLICY_MAX_VALUE = 2L;
+
+    public static final OffsetOption LAST_OFFSET = new 
OffsetOption(Type.POLICY, POLICY_LAST_VALUE);
+    public static final OffsetOption MIN_OFFSET = new 
OffsetOption(Type.POLICY, POLICY_MIN_VALUE);
+    public static final OffsetOption MAX_OFFSET = new 
OffsetOption(Type.POLICY, POLICY_MAX_VALUE);
+
+    private final Type type;
+    private final long value;
+
+    private OffsetOption(Type type, long value) {
+        this.type = type;
+        this.value = value;
+    }
+
+    public static OffsetOption ofOffset(long offset) {
+        if (offset < 0) {
+            throw new IllegalArgumentException("offset must be greater than or 
equal to 0");
+        }
+        return new OffsetOption(Type.OFFSET, offset);
+    }
+
+    public static OffsetOption ofTailN(long tailN) {
+        if (tailN < 0) {
+            throw new IllegalArgumentException("tailN must be greater than or 
equal to 0");
+        }
+        return new OffsetOption(Type.TAIL_N, tailN);
+    }
+
+    public static OffsetOption ofTimestamp(long timestamp) {
+        if (timestamp < 0) {
+            throw new IllegalArgumentException("timestamp must be greater than 
or equal to 0");
+        }
+        return new OffsetOption(Type.TIMESTAMP, timestamp);
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public long getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OffsetOption option = (OffsetOption) o;
+        return value == option.value && type == option.type;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hashCode(type);
+        result = 31 * result + Long.hashCode(value);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("value", value)
+            .toString();
+    }
+
+    public enum Type {
+        POLICY,
+        OFFSET,
+        TAIL_N,
+        TIMESTAMP
+    }
+
+}
\ No newline at end of file
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
index 30429a8f..f5a049a8 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import org.apache.rocketmq.client.apis.consumer.OffsetOption;
 import 
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -83,6 +84,11 @@ public class LitePushConsumerImpl extends PushConsumerImpl 
implements LitePushCo
 
     @Override
     public void subscribeLite(String liteTopic) throws ClientException {
+        subscribeLite(liteTopic, null);
+    }
+
+    @Override
+    public void subscribeLite(String liteTopic, OffsetOption offsetOption) 
throws ClientException {
         checkRunning();
         if (litePushConsumerSettings.containsLiteTopic(liteTopic)) {
             return;
@@ -90,7 +96,8 @@ public class LitePushConsumerImpl extends PushConsumerImpl 
implements LitePushCo
         validateLiteTopic(liteTopic);
         checkLiteSubscriptionQuota(1);
         ListenableFuture<Void> future =
-            syncLiteSubscription(LiteSubscriptionAction.PARTIAL_ADD, 
Collections.singleton(liteTopic));
+            syncLiteSubscription(LiteSubscriptionAction.PARTIAL_ADD,
+                Collections.singleton(liteTopic), offsetOption);
         try {
             handleClientFuture(future);
         } catch (ClientException e) {
@@ -128,7 +135,8 @@ public class LitePushConsumerImpl extends PushConsumerImpl 
implements LitePushCo
             return;
         }
         ListenableFuture<Void> future =
-            syncLiteSubscription(LiteSubscriptionAction.PARTIAL_REMOVE, 
Collections.singleton(liteTopic));
+            syncLiteSubscription(LiteSubscriptionAction.PARTIAL_REMOVE,
+                Collections.singleton(liteTopic), null);
         try {
             handleClientFuture(future);
         } catch (ClientException e) {
@@ -149,19 +157,57 @@ public class LitePushConsumerImpl extends 
PushConsumerImpl implements LitePushCo
     protected void syncAllLiteSubscription() throws ClientException {
         checkLiteSubscriptionQuota(0);
         final Set<String> set = litePushConsumerSettings.getLiteTopicSet();
-        ListenableFuture<Void> future = 
syncLiteSubscription(LiteSubscriptionAction.COMPLETE_ADD, set);
+        ListenableFuture<Void> future = 
syncLiteSubscription(LiteSubscriptionAction.COMPLETE_ADD, set, null);
         handleClientFuture(future);
     }
 
-    protected ListenableFuture<Void> 
syncLiteSubscription(LiteSubscriptionAction action, Collection<String> diff) {
-        SyncLiteSubscriptionRequest request = 
SyncLiteSubscriptionRequest.newBuilder()
+    protected ListenableFuture<Void> syncLiteSubscription(
+        LiteSubscriptionAction action,
+        Collection<String> diff,
+        OffsetOption offsetOption
+    ) {
+        SyncLiteSubscriptionRequest.Builder builder = 
SyncLiteSubscriptionRequest.newBuilder()
             .setAction(action)
             .setTopic(litePushConsumerSettings.bindTopic.toProtobuf())
             .setGroup(litePushConsumerSettings.group.toProtobuf())
-            .addAllLiteTopicSet(diff)
-            .build();
+            .addAllLiteTopicSet(diff);
+        if (offsetOption != null) {
+            builder.setOffsetOption(toProtobufOffsetOption(offsetOption));
+        }
         Endpoints endpoints = getEndpoints();
-        return syncLiteSubscription0(endpoints, request);
+        return syncLiteSubscription0(endpoints, builder.build());
+    }
+
+    protected apache.rocketmq.v2.OffsetOption 
toProtobufOffsetOption(OffsetOption offsetOption) {
+        apache.rocketmq.v2.OffsetOption.Builder protoBuilder = 
apache.rocketmq.v2.OffsetOption.newBuilder();
+        switch (offsetOption.getType()) {
+            case POLICY:
+                
protoBuilder.setPolicy(toProtobufPolicy(offsetOption.getValue()));
+                break;
+            case OFFSET:
+                protoBuilder.setOffset(offsetOption.getValue());
+                break;
+            case TAIL_N:
+                protoBuilder.setTailN(offsetOption.getValue());
+                break;
+            case TIMESTAMP:
+                protoBuilder.setTimestamp(offsetOption.getValue());
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown OffsetOption type: 
" + offsetOption.getType());
+        }
+        return protoBuilder.build();
+    }
+
+    protected apache.rocketmq.v2.OffsetOption.Policy toProtobufPolicy(long 
policyValue) {
+        if (policyValue == OffsetOption.POLICY_LAST_VALUE) {
+            return apache.rocketmq.v2.OffsetOption.Policy.LAST;
+        } else if (policyValue == OffsetOption.POLICY_MIN_VALUE) {
+            return apache.rocketmq.v2.OffsetOption.Policy.MIN;
+        } else if (policyValue == OffsetOption.POLICY_MAX_VALUE) {
+            return apache.rocketmq.v2.OffsetOption.Policy.MAX;
+        }
+        throw new IllegalArgumentException("Unknown policy type: " + 
policyValue);
     }
 
     protected ListenableFuture<Void> syncLiteSubscription0(Endpoints 
endpoints, SyncLiteSubscriptionRequest request) {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
index b71ab166..a6232161 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
@@ -35,14 +35,17 @@ import apache.rocketmq.v2.LiteSubscriptionAction;
 import com.google.common.util.concurrent.Futures;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.OffsetOption;
 import 
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class LitePushConsumerImplTest {
 
     final String endpoints = "127.0.0.1:8080";
@@ -63,7 +66,6 @@ public class LitePushConsumerImplTest {
 
         spySettings = Mockito.spy(realSettings);
 
-        MockitoAnnotations.openMocks(this);
         consumer = mock(LitePushConsumerImpl.class, CALLS_REAL_METHODS);
         // Set final field litePushConsumerSettings using reflection
         try {
@@ -93,7 +95,7 @@ public class LitePushConsumerImplTest {
 
         verify(consumer).checkRunning();
         verify(spySettings).containsLiteTopic(liteTopic);
-        verify(consumer, never()).syncLiteSubscription(any(), any());
+        verify(consumer, never()).syncLiteSubscription(any(), any(), any());
     }
 
     @Test
@@ -102,7 +104,7 @@ public class LitePushConsumerImplTest {
         String liteTopic2 = "testLiteTopic2";
         doNothing().when(consumer).checkRunning();
         doReturn(Futures.immediateVoidFuture()).when(consumer)
-            .syncLiteSubscription(any(LiteSubscriptionAction.class), 
anyCollection());
+            .syncLiteSubscription(any(LiteSubscriptionAction.class), 
anyCollection(), any());
         when(spySettings.getLiteSubscriptionQuota()).thenReturn(1);
 
         consumer.subscribeLite(liteTopic1);
@@ -122,4 +124,80 @@ public class LitePushConsumerImplTest {
         verify(spySettings, times(1)).removeLiteTopic(liteTopic1);
         verify(spySettings, times(1)).addLiteTopic(liteTopic2);
     }
+
+    @Test
+    public void testToProtobufOffsetOptionWithPolicy() {
+        OffsetOption offsetOption = OffsetOption.LAST_OFFSET;
+        apache.rocketmq.v2.OffsetOption protobufOffsetOption = 
consumer.toProtobufOffsetOption(offsetOption);
+        assertThat(protobufOffsetOption.hasPolicy()).isTrue();
+        
assertThat(protobufOffsetOption.getPolicy()).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.LAST);
+        assertThat(protobufOffsetOption.hasOffset()).isFalse();
+        assertThat(protobufOffsetOption.hasTailN()).isFalse();
+        assertThat(protobufOffsetOption.hasTimestamp()).isFalse();
+    }
+
+    @Test
+    public void testToProtobufOffsetOptionWithOffset() {
+        long offsetValue = 100L;
+        OffsetOption offsetOption = OffsetOption.ofOffset(offsetValue);
+        apache.rocketmq.v2.OffsetOption protobufOffsetOption = 
consumer.toProtobufOffsetOption(offsetOption);
+        assertThat(protobufOffsetOption.hasOffset()).isTrue();
+        assertThat(protobufOffsetOption.getOffset()).isEqualTo(offsetValue);
+        assertThat(protobufOffsetOption.hasPolicy()).isFalse();
+        assertThat(protobufOffsetOption.hasTailN()).isFalse();
+        assertThat(protobufOffsetOption.hasTimestamp()).isFalse();
+    }
+
+    @Test
+    public void testToProtobufOffsetOptionWithTailN() {
+        long tailNValue = 5L;
+        OffsetOption offsetOption = OffsetOption.ofTailN(tailNValue);
+        apache.rocketmq.v2.OffsetOption protobufOffsetOption = 
consumer.toProtobufOffsetOption(offsetOption);
+        assertThat(protobufOffsetOption.hasTailN()).isTrue();
+        assertThat(protobufOffsetOption.getTailN()).isEqualTo(tailNValue);
+        assertThat(protobufOffsetOption.hasPolicy()).isFalse();
+        assertThat(protobufOffsetOption.hasOffset()).isFalse();
+        assertThat(protobufOffsetOption.hasTimestamp()).isFalse();
+    }
+
+    @Test
+    public void testToProtobufOffsetOptionWithTimestamp() {
+        long timestampValue = System.currentTimeMillis();
+        OffsetOption offsetOption = OffsetOption.ofTimestamp(timestampValue);
+        apache.rocketmq.v2.OffsetOption protobufOffsetOption = 
consumer.toProtobufOffsetOption(offsetOption);
+        assertThat(protobufOffsetOption.hasTimestamp()).isTrue();
+        
assertThat(protobufOffsetOption.getTimestamp()).isEqualTo(timestampValue);
+        assertThat(protobufOffsetOption.hasPolicy()).isFalse();
+        assertThat(protobufOffsetOption.hasOffset()).isFalse();
+        assertThat(protobufOffsetOption.hasTailN()).isFalse();
+    }
+
+    @Test
+    public void testToProtobufPolicyWithLast() {
+        long policyValue = OffsetOption.POLICY_LAST_VALUE;
+        apache.rocketmq.v2.OffsetOption.Policy policy = 
consumer.toProtobufPolicy(policyValue);
+        
assertThat(policy).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.LAST);
+    }
+
+    @Test
+    public void testToProtobufPolicyWithMin() {
+        long policyValue = OffsetOption.POLICY_MIN_VALUE;
+        apache.rocketmq.v2.OffsetOption.Policy policy = 
consumer.toProtobufPolicy(policyValue);
+        
assertThat(policy).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.MIN);
+    }
+
+    @Test
+    public void testToProtobufPolicyWithMax() {
+        long policyValue = OffsetOption.POLICY_MAX_VALUE;
+        apache.rocketmq.v2.OffsetOption.Policy policy = 
consumer.toProtobufPolicy(policyValue);
+        
assertThat(policy).isEqualTo(apache.rocketmq.v2.OffsetOption.Policy.MAX);
+    }
+
+    @Test
+    public void testToProtobufPolicyWithUnknownValue() {
+        long unknownPolicyValue = 999L;
+        assertThatThrownBy(() -> consumer.toProtobufPolicy(unknownPolicyValue))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessageContaining("Unknown policy type");
+    }
 }

Reply via email to