This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new ef216972 [ISSUE #1107] [Java] LiteTopic support for "RIP‐83 Lite 
Topic" (#1108)
ef216972 is described below

commit ef216972730c47eeaaf7575a1d125bfd4fbabe4e
Author: Quan <[email protected]>
AuthorDate: Mon Oct 20 11:47:57 2025 +0800

    [ISSUE #1107] [Java] LiteTopic support for "RIP‐83 Lite Topic" (#1108)
    
    * producer lite topic
    
    * new MessageImpl with MessageBuilderImpl
    
    * lite push consumer
    
    * createFifoConsumeService
    
    * default fifo true for LitePushConsumerSettings
    
    * lite push consumer sub bindTopic *
    
    * 1. fix heartbeat;
    2. add version for interest;
    
    * upgrade to 5.0.9-lite-topic-SNAPSHOT and fix logback
    
    * add some test
    
    * subscribeLite List<String>
    
    * Collection instead of List
    
    * check point
    
    * syncLiteSubscription
    
    * code refactor
    
    * fix check style
    
    * onNotifyUnsubscribeLiteCommand
    
    * LiteSubscriptionQuotaExceededException and LiteTopicQuotaExceededException
    
    * subscribeLite atomic
    
    * client side quota check
    
    * client-side lite subscription quota limit
    
    * checkLiteSubscriptionQuota 3000
    
    * validateLiteTopic for lite push consumer
    
    * ack with lite topic if present.
    
    * forward syncSettings exception to upper layer
    
    * set liteSubscriptionQuota 1200
    
    * lite push conusmer setInvisibleDuration
    
    * LitePushConsumerSettings#toString more fields
    
    * revert InvisibleDuration
    
    * log when subscribeLite raise ClientException
    
    * LitePushConsumerImplTest
    
    * more clearer LitePushConsumer Interface
    
    * remove subscribeLite(Collection<String>)
    
    * code refactor and more unit test
    
    * remove setEnableFifoConsumeAccelerator
    
    * code recover
    
    * fix
    
    * fix
    
    * fix code style
    
    * add liteTopic for ForwardMessageToDeadLetterQueueRequest
    
    * fix normal group consume lite topic
    
    * log client type
    
    * notify unsubscribe lite with liteTopic only
    
    * fix test
    
    * complete lite example
    
    * LitePushConsumer#getLiteTopicSet
    
    * fix comments
    
    * remove fifo = true
    
    * log warn for onNotifyUnsubscribeLiteCommand default implementation
    
    * use new proto and update to 5.1.0-lite-topic-SNAPSHOT
    
    Change-Id: I01f0c9c64be2614d0215560240548bba2078e505
    
    * log when subscribeLite and unsubscribeLite
    
    Change-Id: I01cb9cd4d9a474218e674662407708cf069bf426
    
    * revert pom config
    
    Change-Id: I7cc3f5624bf73b8cfeb902f4e0fc3b1fc7d4062d
    
    * rocketmq-proto.version to 2.1.0-SNAPSHOT
    
    Change-Id: I686e9ab7eaed1af1d1b01a57a88438af4bfa1878
    
    * liteSubscriptionQuota default to 0
    
    Change-Id: I3bcfaaa323b1eea3bd0665ab1d8722a7bc47a4fe
    
    * upgrade to 5.1.0-SNAPSHOT
    
    Change-Id: I360dbd38de8fa2cc21ab4ebb827fcbdce0258fa6
    
    * add comments
    
    Change-Id: I062b4a92032788b6c361304d45b6a40e9a28923a
    
    * add comments
    
    Change-Id: I53a21ec7faa0ba85d46c020a1709f4e011c799ec
    
    ---------
    
    Co-authored-by: Quan <[email protected]>
    Co-authored-by: moling <[email protected]>
---
 java/client-apis/pom.xml                           |   2 +-
 .../client/apis/ClientServiceProvider.java         |   8 +
 .../client/apis/consumer/LitePushConsumer.java     |  73 +++++++
 .../apis/consumer/LitePushConsumerBuilder.java     |  93 +++++++++
 .../rocketmq/client/apis/message/Message.java      |   7 +
 .../client/apis/message/MessageBuilder.java        |   8 +
 .../rocketmq/client/apis/message/MessageView.java  |   7 +
 java/client-shade/pom.xml                          |   2 +-
 java/client/pom.xml                                |   2 +-
 .../client/java/example/LiteProducerExample.java   |  66 +++++++
 .../java/example/LitePushConsumerExample.java      | 100 ++++++++++
 .../LiteSubscriptionQuotaExceededException.java}   |  21 +-
 .../LiteTopicQuotaExceededException.java}          |  21 +-
 .../client/java/exception/StatusChecker.java       |   5 +
 .../rocketmq/client/java/impl/ClientImpl.java      |  12 ++
 .../rocketmq/client/java/impl/ClientManager.java   |  16 ++
 .../client/java/impl/ClientManagerImpl.java        |  20 ++
 .../java/impl/ClientServiceProviderImpl.java       |  14 +-
 .../client/java/impl/ClientSessionImpl.java        |  21 +-
 .../rocketmq/client/java/impl/ClientType.java      |   4 +
 .../apache/rocketmq/client/java/impl/Settings.java |   4 +
 .../client/java/impl/consumer/ConsumerImpl.java    |  17 +-
 .../impl/consumer/LitePushConsumerBuilderImpl.java | 108 ++++++++++
 .../java/impl/consumer/LitePushConsumerImpl.java   | 220 +++++++++++++++++++++
 .../impl/consumer/LitePushConsumerSettings.java    | 127 ++++++++++++
 .../java/impl/consumer/ProcessQueueImpl.java       |   6 +-
 .../java/impl/consumer/PushConsumerImpl.java       |  55 +++---
 .../impl/consumer/PushSubscriptionSettings.java    |  25 ++-
 .../java/impl/consumer/SimpleConsumerImpl.java     |   8 -
 .../java/impl/producer/ClientSessionHandler.java   |   6 +
 .../client/java/message/GeneralMessage.java        |   7 +
 .../client/java/message/GeneralMessageImpl.java    |   8 +
 .../client/java/message/MessageBuilderImpl.java    |  28 ++-
 .../rocketmq/client/java/message/MessageImpl.java  |  27 ++-
 .../rocketmq/client/java/message/MessageType.java  |   5 +
 .../client/java/message/MessageViewImpl.java       |  20 +-
 .../client/java/message/PublishingMessageImpl.java |   7 +
 .../rocketmq/client/java/route/Endpoints.java      |  10 +-
 .../apache/rocketmq/client/java/rpc/RpcClient.java |  14 ++
 .../rocketmq/client/java/rpc/RpcClientImpl.java    |  12 ++
 .../client/java/ClientServiceProviderImplTest.java |   7 +
 .../client/java/exception/StatusCheckerTest.java   |  27 +++
 .../client/java/impl/ClientManagerImplTest.java    |  10 +
 .../client/java/impl/ClientSessionImplTest.java    |  24 +++
 .../consumer/LitePushConsumerBuilderImplTest.java  | 124 ++++++++++++
 .../impl/consumer/LitePushConsumerImplTest.java    | 125 ++++++++++++
 .../java/impl/consumer/ProcessQueueImplTest.java   |   2 +-
 .../consumer/PushSubscriptionSettingsTest.java     |  43 ++--
 .../java/message/GeneralMessageImplTest.java       | 132 ++++++++++++-
 .../client/java/message/MessageImplTest.java       |  17 ++
 .../apache/rocketmq/client/java/tool/TestBase.java |   2 +-
 java/pom.xml                                       |   4 +-
 java/test/pom.xml                                  |   2 +-
 53 files changed, 1589 insertions(+), 146 deletions(-)

diff --git a/java/client-apis/pom.xml b/java/client-apis/pom.xml
index 807af5a6..57f9a55e 100644
--- a/java/client-apis/pom.xml
+++ b/java/client-apis/pom.xml
@@ -18,7 +18,7 @@
     <parent>
         <artifactId>rocketmq-client-java-parent</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>5.0.9-SNAPSHOT</version>
+        <version>5.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
index 523f1f8c..723ca2e2 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.apis;
 
 import java.util.Iterator;
 import java.util.ServiceLoader;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
 import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 import org.apache.rocketmq.client.apis.message.MessageBuilder;
@@ -59,6 +60,13 @@ public interface ClientServiceProvider {
      */
     PushConsumerBuilder newPushConsumerBuilder();
 
+    /**
+     * Get the lite push consumer builder by the current provider.
+     *
+     * @return the lite push consumer builder instance.
+     */
+    LitePushConsumerBuilder newLitePushConsumerBuilder();
+
     /**
      * Get the simple consumer builder by the current provider.
      *
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
new file mode 100644
index 00000000..2cf276b3
--- /dev/null
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.apis.consumer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import org.apache.rocketmq.client.apis.ClientException;
+
+public interface LitePushConsumer extends Closeable {
+
+    /**
+     * Subscribe to a lite topic.
+     * <p>
+     * The subscribeLite() method initiates network requests and performs 
quota verification, so it may fail.
+     * It's important to check the result of this call to ensure that the 
subscription was successfully added.
+     * Possible failure scenarios include:
+     * 1. Network request errors, which can be retried.
+     * 2. Quota verification failures, indicated by 
LiteSubscriptionQuotaExceededException. In this case,
+     *    evaluate whether the quota is insufficient and promptly unsubscribe 
from unused subscriptions
+     *    using unsubscribeLite() to free up resources.
+     *
+     * @param liteTopic the name of the lite topic to subscribe to
+     * @throws ClientException if an error occurs during subscription
+     */
+    void subscribeLite(String liteTopic) throws ClientException;
+
+    /**
+     * Unsubscribe from a lite topic.
+     *
+     * @param liteTopic the name of the lite topic to unsubscribe from
+     * @throws ClientException if an error occurs during unsubscription
+     */
+    void unsubscribeLite(String liteTopic) throws ClientException;
+
+    /**
+     * Get the lite topic immutable set.
+     *
+     * @return lite topic immutable set.
+     */
+    Set<String> getLiteTopicSet();
+
+    /**
+     * Get the load balancing group for the consumer.
+     *
+     * @return consumer load balancing group.
+     */
+    String getConsumerGroup();
+
+    /**
+     * Close the consumer and release all related resources.
+     *
+     * <p>Once consumer is closed, <strong>it could not be started once 
again.</strong> we maintained an FSM
+     * (finite-state machine) to record the different states for each push 
consumer.
+     */
+    @Override
+    void close() throws IOException;
+}
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumerBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumerBuilder.java
new file mode 100644
index 00000000..3e526bc2
--- /dev/null
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/LitePushConsumerBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.apis.consumer;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+
+/**
+ * Builder to config and start {@link LitePushConsumer}.
+ */
+public interface LitePushConsumerBuilder {
+
+    /**
+     * Set the bind topic for lite push consumer.
+     *
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder bindTopic(String bindTopic);
+
+    /**
+     * Set the client configuration for the consumer.
+     *
+     * @param clientConfiguration client's configuration.
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder setClientConfiguration(ClientConfiguration 
clientConfiguration);
+
+    /**
+     * Set the load balancing group for the consumer.
+     *
+     * @param consumerGroup consumer load balancing group.
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder setConsumerGroup(String consumerGroup);
+
+    /**
+     * Register message listener, all messages meet the subscription 
expression would across listener here.
+     *
+     * @param listener message listener.
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder setMessageListener(MessageListener listener);
+
+    /**
+     * Set the maximum number of messages cached locally.
+     *
+     * @param count message count.
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder setMaxCacheMessageCount(int count);
+
+    /**
+     * Set the maximum bytes of messages cached locally.
+     *
+     * @param bytes message size.
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes);
+
+    /**
+     * Set the consumption thread count in parallel.
+     *
+     * @param count thread count.
+     * @return the consumer builder instance.
+     */
+    LitePushConsumerBuilder setConsumptionThreadCount(int count);
+
+    /**
+     * Finalize the build of {@link LitePushConsumer} and start.
+     *
+     * <p>This method will block until the push consumer starts successfully.
+     *
+     * <p>Especially, if this method is invoked more than once, different push 
consumers will be created and started.
+     *
+     * @return the lite push consumer instance.
+     */
+    LitePushConsumer build() throws ClientException;
+}
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
index d08b9c7a..e82be932 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
@@ -72,6 +72,13 @@ public interface Message {
      */
     Optional<String> getMessageGroup();
 
+    /**
+     * Get the lite topic, which is used for lite topic message type.
+     *
+     * @return lite topic, which is optional, {@link Optional#empty()} means 
lite topic is not specified.
+     */
+    Optional<String> getLiteTopic();
+
     /**
      * Get the expected delivery timestamp, which make sense only when topic 
type is delay.
      *
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
index 8275e881..3ccea469 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
@@ -87,6 +87,14 @@ public interface MessageBuilder {
      */
     MessageBuilder setMessageGroup(String messageGroup);
 
+    /**
+     * Set the lite topic for the message, which is optional.
+     *
+     * @param liteTopic lite topic for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setLiteTopic(String liteTopic);
+
     /**
      * Set the delivery timestamp for the message, which is optional.
      *
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
index dded0d4b..2f7909ae 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
@@ -79,6 +79,13 @@ public interface MessageView {
      */
     Optional<String> getMessageGroup();
 
+    /**
+     * Get the lite topic, which makes sense only when the topic type is LITE.
+     *
+     * @return lite topic, which is optional, {@link Optional#empty()} means 
lite topic is not specified.
+     */
+    Optional<String> getLiteTopic();
+
     /**
      * Get the expected delivery timestamp, which makes sense only when the 
topic type is delay.
      *
diff --git a/java/client-shade/pom.xml b/java/client-shade/pom.xml
index 57d633ea..241da9cf 100644
--- a/java/client-shade/pom.xml
+++ b/java/client-shade/pom.xml
@@ -18,7 +18,7 @@
     <parent>
         <artifactId>rocketmq-client-java-parent</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>5.0.9-SNAPSHOT</version>
+        <version>5.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/java/client/pom.xml b/java/client/pom.xml
index 11b2f424..2d16677c 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -18,7 +18,7 @@
     <parent>
         <artifactId>rocketmq-client-java-parent</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>5.0.9-SNAPSHOT</version>
+        <version>5.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LiteProducerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LiteProducerExample.java
new file mode 100644
index 00000000..e841f28c
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LiteProducerExample.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import 
org.apache.rocketmq.client.java.exception.LiteTopicQuotaExceededException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LiteProducerExample {
+    static final Logger log = 
LoggerFactory.getLogger(LiteProducerExample.class);
+
+    private LiteProducerExample() {
+    }
+
+    public static void main(String[] args) throws ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+
+        String topic = "yourParentTopic";
+        final Producer producer = ProducerSingleton.getInstance(topic);
+        // Define your message body.
+        byte[] body = "This is a lite message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
+        final Message message = provider.newMessageBuilder()
+            // Set topic for the current message.
+            .setTopic(topic)
+            // Key(s) of the message, another way to mark message besides 
message id.
+            .setKeys("yourMessageKey-3ee439f945d7")
+            // Set your lite topic
+            .setLiteTopic("lite-topic-1")
+            .setBody(body)
+            .build();
+        try {
+            final SendReceipt sendReceipt = producer.send(message);
+            log.info("Send message successfully, messageId={}", 
sendReceipt.getMessageId());
+        } catch (LiteTopicQuotaExceededException e) {
+            // Lite topic quota exceeded.
+            // Evaluate and increase the lite topic resource limit.
+            log.error("Lite topic quota exceeded", e);
+        } catch (Throwable t) {
+            log.error("Failed to send message", t);
+        }
+        // Close the producer when you don't need it anymore.
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.close();
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
new file mode 100644
index 00000000..ff990d4c
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import 
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LitePushConsumerExample {
+    private static final Logger log = 
LoggerFactory.getLogger(LitePushConsumerExample.class);
+
+    private LitePushConsumerExample() {
+    }
+
+    public static void main(String[] args) throws ClientException, 
InterruptedException, IOException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+
+        // Credential provider is optional for client configuration.
+        String accessKey = "yourAccessKey";
+        String secretKey = "yourSecretKey";
+        SessionCredentialsProvider sessionCredentialsProvider =
+            new StaticSessionCredentialsProvider(accessKey, secretKey);
+
+        String endpoints = "foobar.com:8080";
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setEndpoints(endpoints)
+            // On some Windows platforms, you may encounter SSL compatibility 
issues. Try turning off the SSL option in
+            // client configuration to solve the problem please if SSL is not 
essential.
+            // .enableSsl(false)
+            .setCredentialProvider(sessionCredentialsProvider)
+            .build();
+        String consumerGroup = "yourConsumerGroup";
+        String topic = "yourParentTopic";
+        // In most case, you don't need to create too many consumers, 
singleton pattern is recommended.
+        LitePushConsumer litePushConsumer = 
provider.newLitePushConsumerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            // Set the consumer group name.
+            .setConsumerGroup(consumerGroup)
+            // Bind to the parent topic
+            .bindTopic(topic)
+            .setMessageListener(messageView -> {
+                // Handle the received message and return consume result.
+                log.info("Consume message={}", messageView);
+                return ConsumeResult.SUCCESS;
+            })
+            .build();
+
+        try {
+            /*
+            The subscribeLite() method initiates network requests and performs 
quota verification, so it may fail.
+            It's important to check the result of this call to ensure that the 
subscription was successfully added.
+            Possible failure scenarios include:
+            1. Network request errors, which can be retried.
+            2. Quota verification failures, indicated by 
LiteSubscriptionQuotaExceededException. In this case,
+               evaluate whether the quota is insufficient and promptly 
unsubscribe from unused subscriptions
+               using unsubscribeLite() to free up resources.
+            */
+            litePushConsumer.subscribeLite("lite-topic-1");
+            litePushConsumer.subscribeLite("lite-topic-2");
+            litePushConsumer.subscribeLite("lite-topic-3");
+        } catch (LiteSubscriptionQuotaExceededException e) {
+            // 1. Evaluate and increase the lite topic resource limit.
+            // 2. Unsubscribe unused lite topics in time
+            // litePushConsumer.unsubscribeLite("lite-topic-3");
+            log.error("Lite subscription quota exceeded", e);
+        } catch (Throwable t) {
+            // should retry later
+            log.error("Failed to subscribe lite topic", t);
+        }
+
+        // Block the main thread, no need for production environment.
+        Thread.sleep(Long.MAX_VALUE);
+        // Close the push consumer when you don't need it anymore.
+        // You could close it manually or add this into the JVM shutdown hook.
+        litePushConsumer.close();
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteSubscriptionQuotaExceededException.java
similarity index 57%
copy from 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
copy to 
java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteSubscriptionQuotaExceededException.java
index d514a8ac..eb17a279 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteSubscriptionQuotaExceededException.java
@@ -15,23 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.java.impl;
+package org.apache.rocketmq.client.java.exception;
 
-public enum ClientType {
-    PRODUCER,
-    PUSH_CONSUMER,
-    SIMPLE_CONSUMER;
+import org.apache.rocketmq.client.apis.ClientException;
 
-    public apache.rocketmq.v2.ClientType toProtobuf() {
-        if (PRODUCER.equals(this)) {
-            return apache.rocketmq.v2.ClientType.PRODUCER;
-        }
-        if (PUSH_CONSUMER.equals(this)) {
-            return apache.rocketmq.v2.ClientType.PUSH_CONSUMER;
-        }
-        if (SIMPLE_CONSUMER.equals(this)) {
-            return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
-        }
-        return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
+public class LiteSubscriptionQuotaExceededException extends ClientException {
+    public LiteSubscriptionQuotaExceededException(int responseCode, String 
requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteTopicQuotaExceededException.java
similarity index 57%
copy from 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
copy to 
java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteTopicQuotaExceededException.java
index d514a8ac..72494bf8 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/LiteTopicQuotaExceededException.java
@@ -15,23 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.java.impl;
+package org.apache.rocketmq.client.java.exception;
 
-public enum ClientType {
-    PRODUCER,
-    PUSH_CONSUMER,
-    SIMPLE_CONSUMER;
+import org.apache.rocketmq.client.apis.ClientException;
 
-    public apache.rocketmq.v2.ClientType toProtobuf() {
-        if (PRODUCER.equals(this)) {
-            return apache.rocketmq.v2.ClientType.PRODUCER;
-        }
-        if (PUSH_CONSUMER.equals(this)) {
-            return apache.rocketmq.v2.ClientType.PUSH_CONSUMER;
-        }
-        if (SIMPLE_CONSUMER.equals(this)) {
-            return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
-        }
-        return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
+public class LiteTopicQuotaExceededException extends ClientException {
+    public LiteTopicQuotaExceededException(int responseCode, String requestId, 
String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
index 58e9cfc0..746e49da 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
@@ -48,6 +48,7 @@ public class StatusChecker {
             case ILLEGAL_MESSAGE_TAG:
             case ILLEGAL_MESSAGE_KEY:
             case ILLEGAL_MESSAGE_GROUP:
+            case ILLEGAL_LITE_TOPIC:
             case ILLEGAL_MESSAGE_PROPERTY_KEY:
             case INVALID_TRANSACTION_ID:
             case ILLEGAL_MESSAGE_ID:
@@ -83,6 +84,10 @@ public class StatusChecker {
                 throw new PayloadEmptyException(codeNumber, requestId, 
statusMessage);
             case TOO_MANY_REQUESTS:
                 throw new TooManyRequestsException(codeNumber, requestId, 
statusMessage);
+            case LITE_TOPIC_QUOTA_EXCEEDED:
+                throw new LiteTopicQuotaExceededException(codeNumber, 
requestId, statusMessage);
+            case LITE_SUBSCRIPTION_QUOTA_EXCEEDED:
+                throw new LiteSubscriptionQuotaExceededException(codeNumber, 
requestId, statusMessage);
             case REQUEST_HEADER_FIELDS_TOO_LARGE:
             case MESSAGE_PROPERTIES_TOO_LARGE:
                 throw new RequestHeaderFieldsTooLargeException(codeNumber, 
requestId, statusMessage);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 80e943fe..f17832bf 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -24,6 +24,7 @@ import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.HeartbeatResponse;
 import apache.rocketmq.v2.MessageQueue;
 import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
@@ -466,6 +467,17 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
             + "command={}", clientId, command);
     }
 
+    /**
+     * This method is invoked while request of unsubscribe lite topic is 
received from remote.
+     * @param endpoints remote endpoints.
+      * @param command  request of unsubscribe lite topic from remote.
+     */
+    @Override
+    public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints, 
NotifyUnsubscribeLiteCommand command) {
+        log.warn("Ignore unsubscribe lite topic command from remote, which is 
not expected, clientId={}, "
+            + "command={}", clientId, command);
+    }
+
     private void updateRouteCache() {
         log.info("Start to update route cache for a new round, clientId={}", 
clientId);
         topicRouteCache.keySet().forEach(topic -> {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index e9ff2076..d8f5c22f 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -39,6 +39,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.AbstractIdleService;
 import io.grpc.stub.StreamObserver;
@@ -182,6 +184,20 @@ public abstract class ClientManager extends 
AbstractIdleService {
     public abstract RpcFuture<RecallMessageRequest, RecallMessageResponse> 
recallMessage(Endpoints endpoints,
         RecallMessageRequest request, Duration duration);
 
+    /**
+     * Sync lite subscription asynchronously, the method ensures no throwable.
+     *
+     * @param endpoints request endpoints.
+     * @param request   request.
+     * @param duration  request max duration.
+     * @return invocation of response future.
+     */
+    public abstract RpcFuture<SyncLiteSubscriptionRequest, 
SyncLiteSubscriptionResponse> syncLiteSubscription(
+        Endpoints endpoints,
+        SyncLiteSubscriptionRequest request,
+        Duration duration
+    );
+
     /**
      * Establish telemetry session stream to server.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 76cfb617..d0fd49bb 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -39,6 +39,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -361,6 +363,24 @@ public class ClientManagerImpl extends ClientManager {
         }
     }
 
+    @Override
+    public RpcFuture<SyncLiteSubscriptionRequest, 
SyncLiteSubscriptionResponse> syncLiteSubscription(
+        Endpoints endpoints,
+        SyncLiteSubscriptionRequest request,
+        Duration duration
+    ) {
+        try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
+            final RpcClient rpcClient = getRpcClient(endpoints);
+            final ListenableFuture<SyncLiteSubscriptionResponse> future =
+                rpcClient.syncLiteSubscription(metadata, request, asyncWorker, 
duration);
+            return new RpcFuture<>(context, request, future);
+        } catch (Throwable t) {
+            return new RpcFuture<>(t);
+        }
+    }
+
     @Override
     public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, 
Duration duration,
         StreamObserver<TelemetryCommand> responseObserver) throws 
ClientException {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
index f502b8f0..ee75bad7 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java
@@ -18,10 +18,12 @@
 package org.apache.rocketmq.client.java.impl;
 
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
 import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 import org.apache.rocketmq.client.apis.message.MessageBuilder;
 import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import 
org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
@@ -37,7 +39,7 @@ public class ClientServiceProviderImpl implements 
ClientServiceProvider {
     }
 
     /**
-     * @see ClientServiceProvider#newMessageBuilder()
+     * @see ClientServiceProvider#newPushConsumerBuilder()
      */
     @Override
     public PushConsumerBuilder newPushConsumerBuilder() {
@@ -45,7 +47,15 @@ public class ClientServiceProviderImpl implements 
ClientServiceProvider {
     }
 
     /**
-     * @see ClientServiceProvider#newMessageBuilder()
+     * @see ClientServiceProvider#newLitePushConsumerBuilder()
+     */
+    @Override
+    public LitePushConsumerBuilder newLitePushConsumerBuilder() {
+        return new LitePushConsumerBuilderImpl();
+    }
+
+    /**
+     * @see ClientServiceProvider#newSimpleConsumerBuilder()
      */
     @Override
     public SimpleConsumerBuilder newSimpleConsumerBuilder() {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 29dd770c..f0c7de3e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.java.impl;
 
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.ReconnectEndpointsCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
@@ -46,7 +47,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
 
     private final ClientSessionHandler sessionHandler;
     private final Endpoints endpoints;
-    private final SettableFuture<Settings> future;
+    private final SettableFuture<Settings> settingsInitFuture;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
     @SuppressWarnings("UnstableApiUsage")
@@ -54,8 +55,8 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
         throws ClientException {
         this.sessionHandler = sessionHandler;
         this.endpoints = endpoints;
-        this.future = SettableFuture.create();
-        Futures.withTimeout(future, 
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
+        this.settingsInitFuture = SettableFuture.create();
+        Futures.withTimeout(settingsInitFuture, 
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
             TimeUnit.MILLISECONDS, sessionHandler.getScheduler());
         this.requestObserver = sessionHandler.telemetry(endpoints, this);
     }
@@ -85,7 +86,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
 
     protected ListenableFuture<Settings> syncSettings() {
         this.syncSettings0();
-        return future;
+        return settingsInitFuture;
     }
 
     private void syncSettings0() {
@@ -129,7 +130,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                     final Settings settings = command.getSettings();
                     log.info("Receive settings from remote, endpoints={}, 
clientId={}", endpoints, clientId);
                     sessionHandler.onSettingsCommand(endpoints, settings);
-                    if (future.set(settings)) {
+                    if (settingsInitFuture.set(settings)) {
                         log.info("Init settings successfully, endpoints={}, 
clientId={}", endpoints, clientId);
                     }
                     break;
@@ -165,6 +166,14 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                     sessionHandler.onReconnectEndpointsCommand(endpoints, 
reconnectEndpointsCommand);
                     break;
                 }
+                case NOTIFY_UNSUBSCRIBE_LITE_COMMAND: {
+                    final NotifyUnsubscribeLiteCommand 
notifyUnsubscribeLiteCommand =
+                        command.getNotifyUnsubscribeLiteCommand();
+                    log.info("Receive notify unsubscribe lite command from 
remote, endpoints={}, "
+                        + "clientId={}", endpoints, clientId);
+                    sessionHandler.onNotifyUnsubscribeLiteCommand(endpoints, 
notifyUnsubscribeLiteCommand);
+                    break;
+                }
                 default:
                     log.warn("Receive unrecognized command from remote, 
endpoints={}, command={}, clientId={}",
                         endpoints, command, clientId);
@@ -182,6 +191,8 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
             throwable);
         release();
         if (!sessionHandler.isRunning()) {
+            // first time to sync settings, forward the exception to upper 
layer
+            settingsInitFuture.setException(throwable);
             log.info("Session handler is not running, forgive to renew request 
observer, clientId={}, "
                 + "endpoints={}", clientId, endpoints);
             return;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
index d514a8ac..4eba5299 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl;
 public enum ClientType {
     PRODUCER,
     PUSH_CONSUMER,
+    LITE_PUSH_CONSUMER,
     SIMPLE_CONSUMER;
 
     public apache.rocketmq.v2.ClientType toProtobuf() {
@@ -29,6 +30,9 @@ public enum ClientType {
         if (PUSH_CONSUMER.equals(this)) {
             return apache.rocketmq.v2.ClientType.PUSH_CONSUMER;
         }
+        if (LITE_PUSH_CONSUMER.equals(this)) {
+            return apache.rocketmq.v2.ClientType.LITE_PUSH_CONSUMER;
+        }
         if (SIMPLE_CONSUMER.equals(this)) {
             return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
         }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index 88b335c8..cc21b8b2 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -55,6 +55,10 @@ public abstract class Settings {
         return retryPolicy;
     }
 
+    public ClientType getClientType() {
+        return clientType;
+    }
+
     @ExcludeFromJacocoGeneratedReport
     @Override
     public String toString() {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 795c2cac..b3aee647 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -24,6 +24,7 @@ import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.Message;
 import apache.rocketmq.v2.NotifyClientTerminationRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
@@ -53,6 +54,7 @@ import 
org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.impl.ClientManager;
+import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -127,10 +129,13 @@ abstract class ConsumerImpl extends ClientImpl {
             .setResourceNamespace(clientConfiguration.getNamespace())
             .setName(messageView.getTopic())
             .build();
-        final AckMessageEntry entry = AckMessageEntry.newBuilder()
+        final AckMessageEntry.Builder builder = AckMessageEntry.newBuilder()
             .setMessageId(messageView.getMessageId().toString())
-            .setReceiptHandle(messageView.getReceiptHandle())
-            .build();
+            .setReceiptHandle(messageView.getReceiptHandle());
+        if (ClientType.LITE_PUSH_CONSUMER == getSettings().getClientType()) {
+            messageView.getLiteTopic().ifPresent(builder::setLiteTopic);
+        }
+        final AckMessageEntry entry = builder.build();
         return 
AckMessageRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
             .addEntries(entry).build();
     }
@@ -267,4 +272,10 @@ abstract class ConsumerImpl extends ClientImpl {
             
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
             
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
     }
+
+    @Override
+    public HeartbeatRequest wrapHeartbeatRequest() {
+        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
+            .setClientType(getSettings().getClientType().toProtobuf()).build();
+    }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java
new file mode 100644
index 00000000..0f11eddc
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.CONSUMER_GROUP_PATTERN;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+
+public class LitePushConsumerBuilderImpl implements LitePushConsumerBuilder {
+
+    protected String bindTopic = null;
+    // below is same as PushConsumerBuilderImpl
+    protected ClientConfiguration clientConfiguration = null;
+    protected String consumerGroup = null;
+    protected Map<String, FilterExpression> subscriptionExpressions = null;
+    protected MessageListener messageListener = null;
+    protected int maxCacheMessageCount = 1024;
+    protected int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
+    protected int consumptionThreadCount = 20;
+
+    @Override
+    public LitePushConsumerBuilder bindTopic(String bindTopic) {
+        checkArgument(StringUtils.isNotBlank(bindTopic), "bindTopic should not 
be blank");
+        this.bindTopic = bindTopic;
+        return this;
+    }
+
+    @Override
+    public LitePushConsumerBuilder setClientConfiguration(ClientConfiguration 
clientConfiguration) {
+        this.clientConfiguration = checkNotNull(clientConfiguration, 
"clientConfiguration should not be null");
+        return this;
+    }
+
+    @Override
+    public LitePushConsumerBuilder setConsumerGroup(String consumerGroup) {
+        checkNotNull(consumerGroup, "consumerGroup should not be null");
+        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), 
"consumerGroup does not match the "
+            + "regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
+        this.consumerGroup = consumerGroup;
+        return this;
+    }
+
+    @Override
+    public LitePushConsumerBuilder setMessageListener(MessageListener 
messageListener) {
+        this.messageListener = checkNotNull(messageListener, "messageListener 
should not be null");
+        return this;
+    }
+
+    @Override
+    public LitePushConsumerBuilder setMaxCacheMessageCount(int 
maxCachedMessageCount) {
+        checkArgument(maxCachedMessageCount > 0, "maxCachedMessageCount should 
be positive");
+        this.maxCacheMessageCount = maxCachedMessageCount;
+        return this;
+    }
+
+    @Override
+    public LitePushConsumerBuilder setMaxCacheMessageSizeInBytes(int 
maxCacheMessageSizeInBytes) {
+        checkArgument(maxCacheMessageSizeInBytes > 0, 
"maxCacheMessageSizeInBytes should be positive");
+        this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+        return this;
+    }
+
+    @Override
+    public LitePushConsumerBuilder setConsumptionThreadCount(int 
consumptionThreadCount) {
+        checkArgument(consumptionThreadCount > 0, "consumptionThreadCount 
should be positive");
+        this.consumptionThreadCount = consumptionThreadCount;
+        return this;
+    }
+
+    @Override
+    public LitePushConsumer build() throws ClientException {
+        checkNotNull(clientConfiguration, "clientConfiguration has not been 
set yet");
+        checkNotNull(consumerGroup, "consumerGroup has not been set yet");
+        checkNotNull(messageListener, "messageListener has not been set yet");
+        checkNotNull(bindTopic, "bindTopic has not been set yet");
+        // passing bindTopic through subscriptionExpressions to ClientImpl
+        subscriptionExpressions = ImmutableMap.of(bindTopic, 
FilterExpression.SUB_ALL);
+        final LitePushConsumerImpl litePushConsumer = new 
LitePushConsumerImpl(this);
+        litePushConsumer.startAsync().awaitRunning();
+        return litePushConsumer;
+    }
+
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
new file mode 100644
index 00000000..30429a8f
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.LiteSubscriptionAction;
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
+import apache.rocketmq.v2.ReceiveMessageRequest;
+import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import 
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
+import org.apache.rocketmq.client.java.exception.StatusChecker;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LitePushConsumerImpl extends PushConsumerImpl implements 
LitePushConsumer {
+    private static final Logger log = 
LoggerFactory.getLogger(LitePushConsumerImpl.class);
+
+    private volatile ScheduledFuture<?> syncAllScheduledFuture;
+    private final LitePushConsumerSettings litePushConsumerSettings;
+
+    public LitePushConsumerImpl(LitePushConsumerBuilderImpl builder) {
+        super(builder.clientConfiguration, builder.consumerGroup, 
builder.subscriptionExpressions,
+            builder.messageListener, builder.maxCacheMessageCount, 
builder.maxCacheMessageSizeInBytes,
+            builder.consumptionThreadCount, false);
+        this.litePushConsumerSettings = new LitePushConsumerSettings(builder, 
clientId, endpoints);
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+        super.startUp();
+        syncAllScheduledFuture = getScheduler().scheduleWithFixedDelay(() -> {
+            try {
+                syncAllLiteSubscription();
+            } catch (Throwable t) {
+                log.error("Schedule syncAllLiteSubscription error, 
clientId={}", clientId, t);
+            }
+        }, 30, 30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void shutDown() throws InterruptedException {
+        super.shutDown();
+        if (null != syncAllScheduledFuture) {
+            syncAllScheduledFuture.cancel(false);
+        }
+    }
+
+    @Override
+    public void subscribeLite(String liteTopic) throws ClientException {
+        checkRunning();
+        if (litePushConsumerSettings.containsLiteTopic(liteTopic)) {
+            return;
+        }
+        validateLiteTopic(liteTopic);
+        checkLiteSubscriptionQuota(1);
+        ListenableFuture<Void> future =
+            syncLiteSubscription(LiteSubscriptionAction.PARTIAL_ADD, 
Collections.singleton(liteTopic));
+        try {
+            handleClientFuture(future);
+        } catch (ClientException e) {
+            log.error("Failed to subscribeLite {}", liteTopic, e);
+            throw e;
+        }
+        litePushConsumerSettings.addLiteTopic(liteTopic);
+        log.info("SubscribeLite {}, topic={}, group={}, clientId={}",
+            liteTopic, litePushConsumerSettings.bindTopic.getName(), 
getConsumerGroup(), clientId);
+    }
+
+    private void checkLiteSubscriptionQuota(int delta) throws 
LiteSubscriptionQuotaExceededException {
+        int quota = litePushConsumerSettings.getLiteSubscriptionQuota();
+        if (litePushConsumerSettings.getLiteTopicSetSize() + delta > quota) {
+            throw new LiteSubscriptionQuotaExceededException(
+                Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED_VALUE, null, "Lite 
subscription quota exceeded " + quota);
+        }
+    }
+
+    private void validateLiteTopic(String liteTopic) {
+        if (StringUtils.isBlank(liteTopic)) {
+            throw new IllegalArgumentException("liteTopic is blank");
+        }
+        if (liteTopic.length() > 
litePushConsumerSettings.getMaxLiteTopicSize()) {
+            String errorMessage = String.format("liteTopic length exceeded max 
length %d, liteTopic: %s",
+                litePushConsumerSettings.getMaxLiteTopicSize(), liteTopic);
+            throw new IllegalArgumentException(errorMessage);
+        }
+    }
+
+    @Override
+    public void unsubscribeLite(String liteTopic) throws ClientException {
+        checkRunning();
+        if (!litePushConsumerSettings.containsLiteTopic(liteTopic)) {
+            return;
+        }
+        ListenableFuture<Void> future =
+            syncLiteSubscription(LiteSubscriptionAction.PARTIAL_REMOVE, 
Collections.singleton(liteTopic));
+        try {
+            handleClientFuture(future);
+        } catch (ClientException e) {
+            log.error("Failed to unsubscribeLite {}", liteTopic, e);
+            throw e;
+        }
+        litePushConsumerSettings.removeLiteTopic(liteTopic);
+        log.info("UnsubscribeLite {}, topic={}, group={}, clientId={}",
+            liteTopic, litePushConsumerSettings.bindTopic.getName(), 
getConsumerGroup(), clientId);
+    }
+
+    @Override
+    public Set<String> getLiteTopicSet() {
+        return litePushConsumerSettings.getLiteTopicSet();
+    }
+
+    @VisibleForTesting
+    protected void syncAllLiteSubscription() throws ClientException {
+        checkLiteSubscriptionQuota(0);
+        final Set<String> set = litePushConsumerSettings.getLiteTopicSet();
+        ListenableFuture<Void> future = 
syncLiteSubscription(LiteSubscriptionAction.COMPLETE_ADD, set);
+        handleClientFuture(future);
+    }
+
+    protected ListenableFuture<Void> 
syncLiteSubscription(LiteSubscriptionAction action, Collection<String> diff) {
+        SyncLiteSubscriptionRequest request = 
SyncLiteSubscriptionRequest.newBuilder()
+            .setAction(action)
+            .setTopic(litePushConsumerSettings.bindTopic.toProtobuf())
+            .setGroup(litePushConsumerSettings.group.toProtobuf())
+            .addAllLiteTopicSet(diff)
+            .build();
+        Endpoints endpoints = getEndpoints();
+        return syncLiteSubscription0(endpoints, request);
+    }
+
+    protected ListenableFuture<Void> syncLiteSubscription0(Endpoints 
endpoints, SyncLiteSubscriptionRequest request) {
+        final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
+        RpcFuture<SyncLiteSubscriptionRequest, SyncLiteSubscriptionResponse> 
future =
+            this.getClientManager().syncLiteSubscription(endpoints, request, 
requestTimeout);
+
+        return Futures.transformAsync(future, response -> {
+            final Status status = response.getStatus();
+            StatusChecker.check(status, future);
+            return Futures.immediateVoidFuture();
+        }, MoreExecutors.directExecutor());
+    }
+
+    @Override
+    public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints, 
NotifyUnsubscribeLiteCommand command) {
+        String liteTopic = command.getLiteTopic();
+
+        log.info("notify unsubscribe lite liteTopic={} group={} bindTopic={}",
+            liteTopic, getConsumerGroup(), getSettings().bindTopic);
+
+        if (StringUtils.isBlank(liteTopic)) {
+            return;
+        }
+
+        litePushConsumerSettings.removeLiteTopic(liteTopic);
+    }
+
+    @Override
+    public LitePushConsumerSettings getSettings() {
+        return litePushConsumerSettings;
+    }
+
+    @Override
+    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
+        FilterExpression filterExpression, Duration longPollingTimeout, String 
attemptId) {
+        attemptId = null == attemptId ? UUID.randomUUID().toString() : 
attemptId;
+        return ReceiveMessageRequest.newBuilder()
+            .setGroup(getProtobufGroup())
+            .setMessageQueue(mq.toProtobuf())
+            
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
+            .setBatchSize(batchSize)
+            .setAttemptId(attemptId)
+            .setAutoRenew(true)
+            .build();
+    }
+
+    @VisibleForTesting
+    protected void checkRunning() {
+        if (!this.isRunning()) {
+            log.error("lite push consumer not running, state={}, clientId={}",
+                this.state(), clientId);
+            throw new IllegalStateException("lite push consumer not running");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerSettings.java
new file mode 100644
index 00000000..c76a6690
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerSettings.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.Subscription;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.message.protocol.Resource;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LitePushConsumerSettings extends PushSubscriptionSettings {
+    private static final Logger log = 
LoggerFactory.getLogger(LitePushConsumerSettings.class);
+    // bindTopic for lite push consumer
+    final Resource bindTopic;
+    private final Set<String> liteTopicSet = ConcurrentHashMap.newKeySet();
+    /**
+     * client-side lite subscription quota limit
+     */
+    private int liteSubscriptionQuota;
+    private int maxLiteTopicSize = 64;
+
+    private final AtomicLong version = new 
AtomicLong(System.currentTimeMillis());
+
+    public LitePushConsumerSettings(
+        LitePushConsumerBuilderImpl builder,
+        ClientId clientId,
+        Endpoints endpoints
+    ) {
+        super(builder.clientConfiguration, clientId, 
ClientType.LITE_PUSH_CONSUMER, endpoints, builder.consumerGroup,
+            builder.subscriptionExpressions);
+        this.bindTopic = new Resource(namespace, builder.bindTopic);
+    }
+
+    public boolean containsLiteTopic(String liteTopic) {
+        return liteTopicSet.contains(liteTopic);
+    }
+
+    public void addLiteTopic(String liteTopic) {
+        if (!liteTopicSet.add(liteTopic)) {
+            return;
+        }
+        version.set(System.currentTimeMillis());
+    }
+
+    public void removeLiteTopic(String liteTopic) {
+        if (!liteTopicSet.remove(liteTopic)) {
+            return;
+        }
+        version.set(System.currentTimeMillis());
+    }
+
+    public Set<String> getLiteTopicSet() {
+        return ImmutableSet.copyOf(liteTopicSet);
+    }
+
+    public int getLiteSubscriptionQuota() {
+        return liteSubscriptionQuota;
+    }
+
+    public int getMaxLiteTopicSize() {
+        return maxLiteTopicSize;
+    }
+
+    public int getLiteTopicSetSize() {
+        return liteTopicSet.size();
+    }
+
+    public long getVersion() {
+        return version.get();
+    }
+
+    @Override
+    public void sync(apache.rocketmq.v2.Settings settings) {
+        super.sync(settings);
+        final Subscription subscription = settings.getSubscription();
+        if (subscription.hasLiteSubscriptionQuota()) {
+            this.liteSubscriptionQuota = 
subscription.getLiteSubscriptionQuota();
+        }
+        if (subscription.hasMaxLiteTopicSize()) {
+            this.maxLiteTopicSize = subscription.getMaxLiteTopicSize();
+        }
+    }
+
+    @ExcludeFromJacocoGeneratedReport
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("clientId", clientId)
+            .add("clientType", clientType)
+            .add("accessPoint", accessPoint)
+            .add("retryPolicy", retryPolicy)
+            .add("requestTimeout", requestTimeout)
+            .add("group", group)
+            .add("receiveBatchSize", receiveBatchSize)
+            .add("longPollingTimeout", longPollingTimeout)
+            // for lite
+            .add("bindTopic", bindTopic)
+            .add("liteSubscriptionQuota", liteSubscriptionQuota)
+            .add("maxLiteTopicSize", maxLiteTopicSize)
+            .add("interestSet", liteTopicSet)
+            .add("version", version)
+            .toString();
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 6e9146db..c42acda6 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -134,7 +134,7 @@ class ProcessQueueImpl implements ProcessQueue {
 
     @Override
     public boolean expired() {
-        final Duration longPollingTimeout = 
consumer.getPushConsumerSettings().getLongPollingTimeout();
+        final Duration longPollingTimeout = 
consumer.getSettings().getLongPollingTimeout();
         final Duration requestTimeout = 
consumer.getClientConfiguration().getRequestTimeout();
         final Duration maxIdleDuration = 
longPollingTimeout.plus(requestTimeout).multipliedBy(3);
         final Duration idleDuration = Duration.ofNanos(System.nanoTime() - 
activityNanoTime);
@@ -165,7 +165,7 @@ class ProcessQueueImpl implements ProcessQueue {
     private int getReceptionBatchSize() {
         int bufferSize = consumer.cacheMessageCountThresholdPerQueue() - 
this.cachedMessagesCount();
         bufferSize = Math.max(bufferSize, 1);
-        return Math.min(bufferSize, 
consumer.getPushConsumerSettings().getReceiveBatchSize());
+        return Math.min(bufferSize, 
consumer.getSettings().getReceiveBatchSize());
     }
 
     @Override
@@ -235,7 +235,7 @@ class ProcessQueueImpl implements ProcessQueue {
         try {
             final Endpoints endpoints = mq.getBroker().getEndpoints();
             final int batchSize = this.getReceptionBatchSize();
-            final Duration longPollingTimeout = 
consumer.getPushConsumerSettings().getLongPollingTimeout();
+            final Duration longPollingTimeout = 
consumer.getSettings().getLongPollingTimeout();
             final ReceiveMessageRequest request = 
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
                 longPollingTimeout, attemptId);
             activityNanoTime = System.nanoTime();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index c43c3d27..9319b235 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -17,11 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
-import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
-import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.Status;
@@ -63,11 +61,10 @@ import 
org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
 import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
-import org.apache.rocketmq.client.java.impl.Settings;
+import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
-import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.metrics.GaugeObserver;
 import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
@@ -95,7 +92,6 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     final AtomicLong consumptionOkQuantity;
     final AtomicLong consumptionErrorQuantity;
 
-    private final ClientConfiguration clientConfiguration;
     private final PushSubscriptionSettings pushSubscriptionSettings;
     private final String consumerGroup;
     private final Map<String /* topic */, FilterExpression> 
subscriptionExpressions;
@@ -139,10 +135,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int 
consumptionThreadCount,
         boolean enableFifoConsumeAccelerator, boolean 
enableMessageInterceptorFiltering) {
         super(clientConfiguration, consumerGroup, 
subscriptionExpressions.keySet());
-        this.clientConfiguration = clientConfiguration;
-        Resource groupResource = new 
Resource(clientConfiguration.getNamespace(), consumerGroup);
-        this.pushSubscriptionSettings = new 
PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
-            endpoints, groupResource, clientConfiguration.getRequestTimeout(), 
subscriptionExpressions);
+        this.pushSubscriptionSettings = new 
PushSubscriptionSettings(clientConfiguration, clientId,
+            ClientType.PUSH_CONSUMER, endpoints, consumerGroup, 
subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.subscriptionExpressions = subscriptionExpressions;
         this.cacheAssignments = new ConcurrentHashMap<>();
@@ -181,7 +175,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     @Override
     protected void startUp() throws Exception {
         try {
-            log.info("Begin to start the rocketmq push consumer, clientId={}", 
clientId);
+            log.info("Begin to start the rocketmq {}, clientId={}", 
getSettings().getClientType(), clientId);
             GaugeObserver gaugeObserver = new 
ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
             this.clientMeterManager.setGaugeObserver(gaugeObserver);
             super.startUp();
@@ -195,9 +189,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
                     log.error("Exception raised while scanning the load 
assignments, clientId={}", clientId, t);
                 }
             }, 1, 5, TimeUnit.SECONDS);
-            log.info("The rocketmq push consumer starts successfully, 
clientId={}", clientId);
+            log.info("The rocketmq {} starts successfully, clientId={}", 
getSettings().getClientType(), clientId);
         } catch (Throwable t) {
-            log.error("Exception raised while starting the rocketmq push 
consumer, clientId={}", clientId, t);
+            log.error("Exception raised while starting the rocketmq {}, 
clientId={}",
+                getSettings().getClientType(), clientId, t);
             shutDown();
             throw t;
         }
@@ -214,7 +209,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
      */
     @Override
     protected void shutDown() throws InterruptedException {
-        log.info("Begin to shutdown the rocketmq push consumer, clientId={}", 
clientId);
+        log.info("Begin to shutdown the rocketmq {}, clientId={}", 
getSettings().getClientType(), clientId);
         if (null != scanAssignmentsFuture) {
             scanAssignmentsFuture.cancel(false);
         }
@@ -225,12 +220,12 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         ExecutorServices.awaitTerminated(consumptionExecutor);
         TimeUnit.SECONDS.sleep(1);
         super.shutDown();
-        log.info("Shutdown the rocketmq push consumer successfully, 
clientId={}", clientId);
+        log.info("Shutdown the rocketmq {} successfully, clientId={}", 
getSettings().getClientType(), clientId);
     }
 
     private void waitingReceiveRequestFinished() {
         Duration maxWaitingTime = clientConfiguration.getRequestTimeout()
-            .plus(pushSubscriptionSettings.getLongPollingTimeout());
+            .plus(getSettings().getLongPollingTimeout());
         long endTime = System.currentTimeMillis() + maxWaitingTime.toMillis();
         try {
             while (true) {
@@ -251,9 +246,9 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         }
     }
 
-    private ConsumeService createConsumeService() {
+    protected ConsumeService createConsumeService() {
         final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
-        if (pushSubscriptionSettings.isFifo()) {
+        if (getSettings().isFifo()) {
             log.info("Create FIFO consume service, consumerGroup={}, 
clientId={}, enableFifoConsumeAccelerator={}",
                 consumerGroup, clientId, enableFifoConsumeAccelerator);
             return new FifoConsumeService(clientId, messageListener, 
consumptionExecutor, this,
@@ -271,10 +266,6 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         return consumerGroup;
     }
 
-    public PushSubscriptionSettings getPushConsumerSettings() {
-        return pushSubscriptionSettings;
-    }
-
     /**
      * @see PushConsumer#getSubscriptionExpressions()
      */
@@ -386,13 +377,6 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         return Optional.of(processQueue);
     }
 
-    @Override
-    public HeartbeatRequest wrapHeartbeatRequest() {
-        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
-            .setClientType(ClientType.PUSH_CONSUMER).build();
-    }
-
-
     @VisibleForTesting
     void syncProcessQueue(String topic, Assignments assignments, 
FilterExpression filterExpression) {
         Set<MessageQueueImpl> latest = new HashSet<>();
@@ -486,7 +470,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     }
 
     @Override
-    public Settings getSettings() {
+    public PushSubscriptionSettings getSettings() {
         return pushSubscriptionSettings;
     }
 
@@ -573,11 +557,18 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
                 .setResourceNamespace(clientConfiguration.getNamespace())
                 .setName(messageView.getTopic())
                 .build();
-        return 
ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
+
+        ForwardMessageToDeadLetterQueueRequest.Builder builder = 
ForwardMessageToDeadLetterQueueRequest.newBuilder()
+            .setGroup(getProtobufGroup())
+            .setTopic(topicResource)
             .setReceiptHandle(messageView.getReceiptHandle())
             .setMessageId(messageView.getMessageId().toString())
             .setDeliveryAttempt(messageView.getDeliveryAttempt())
-            .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
+            .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts());
+        if (ClientType.LITE_PUSH_CONSUMER == getSettings().getClientType()) {
+            messageView.getLiteTopic().ifPresent(builder::setLiteTopic);
+        }
+        return builder.build();
     }
 
     public RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse>
@@ -630,7 +621,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     }
 
     public RetryPolicy getRetryPolicy() {
-        return pushSubscriptionSettings.getRetryPolicy();
+        return getSettings().getRetryPolicy();
     }
 
     public ThreadPoolExecutor getConsumptionExecutor() {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 26a66a18..624490d4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -27,6 +27,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
 import org.apache.rocketmq.client.java.impl.ClientType;
@@ -44,16 +45,22 @@ import org.slf4j.LoggerFactory;
 public class PushSubscriptionSettings extends Settings {
     private static final Logger log = 
LoggerFactory.getLogger(PushSubscriptionSettings.class);
 
-    private final Resource group;
-    private final Map<String, FilterExpression> subscriptionExpressions;
-    private volatile Boolean fifo = false;
-    private volatile int receiveBatchSize = 32;
-    private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
+    protected final Resource group;
+    protected final Map<String, FilterExpression> subscriptionExpressions;
+    protected volatile Boolean fifo = false;
+    protected volatile int receiveBatchSize = 32;
+    protected volatile Duration longPollingTimeout = Duration.ofSeconds(30);
 
-    public PushSubscriptionSettings(String namespace, ClientId clientId, 
Endpoints endpoints, Resource group,
-        Duration requestTimeout, Map<String, FilterExpression> 
subscriptionExpression) {
-        super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, 
requestTimeout);
-        this.group = group;
+    public PushSubscriptionSettings(
+        ClientConfiguration configuration,
+        ClientId clientId,
+        ClientType clientType,
+        Endpoints endpoints,
+        String group,
+        Map<String, FilterExpression> subscriptionExpression
+    ) {
+        super(configuration.getNamespace(), clientId, clientType, endpoints, 
configuration.getRequestTimeout());
+        this.group = new Resource(configuration.getNamespace(), group);
         this.subscriptionExpressions = subscriptionExpression;
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index e73774e5..0ffa7928 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -21,8 +21,6 @@ import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
-import apache.rocketmq.v2.ClientType;
-import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Status;
 import com.google.common.math.IntMath;
@@ -154,12 +152,6 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
         return new HashMap<>(subscriptionExpressions);
     }
 
-    @Override
-    public HeartbeatRequest wrapHeartbeatRequest() {
-        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
-            .setClientType(ClientType.SIMPLE_CONSUMER).build();
-    }
-
     /**
      * @see SimpleConsumer#receive(int, Duration)
      */
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index b6f9c559..924e903d 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.ReconnectEndpointsCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
@@ -103,4 +104,9 @@ public interface ClientSessionHandler {
      * Event processor for {@link ReconnectEndpointsCommand}.
      */
     void onReconnectEndpointsCommand(Endpoints endpoints, 
ReconnectEndpointsCommand command);
+
+    /**
+     * Event processor for {@link NotifyUnsubscribeLiteCommand}.
+     */
+    void onNotifyUnsubscribeLiteCommand(Endpoints endpoints, 
NotifyUnsubscribeLiteCommand command);
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
index 2e614fd5..827d8654 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
@@ -81,6 +81,13 @@ public interface GeneralMessage {
      */
     Optional<String> getMessageGroup();
 
+    /**
+     * Get the lite topic, which makes sense only when the topic type is LITE.
+     *
+     * @return lite topic, which is optional, {@link Optional#empty()} means 
lite topic is not specified.
+     */
+    Optional<String> getLiteTopic();
+
     /**
      * Get the expected delivery timestamp, which makes sense only when topic 
type is delay.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
index ed46be42..ffc390e5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
@@ -35,6 +35,7 @@ public class GeneralMessageImpl implements GeneralMessage {
     private final String tag;
     private final Collection<String> keys;
     private final String messageGroup;
+    private final String liteTopic;
     private final Long deliveryTimestamp;
     private final String bornHost;
     private final Long bornTimestamp;
@@ -58,6 +59,7 @@ public class GeneralMessageImpl implements GeneralMessage {
         this.tag = message.getTag().orElse(null);
         this.keys = message.getKeys();
         this.messageGroup = message.getMessageGroup().orElse(null);
+        this.liteTopic = message.getLiteTopic().orElse(null);
         this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
         this.bornHost = null;
         this.bornTimestamp = null;
@@ -92,6 +94,7 @@ public class GeneralMessageImpl implements GeneralMessage {
         this.tag = message.getTag().orElse(null);
         this.keys = message.getKeys();
         this.messageGroup = message.getMessageGroup().orElse(null);
+        this.liteTopic = message.getLiteTopic().orElse(null);
         this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
         this.bornHost = message.getBornHost();
         this.bornTimestamp = message.getBornTimestamp();
@@ -136,6 +139,11 @@ public class GeneralMessageImpl implements GeneralMessage {
         return Optional.ofNullable(messageGroup);
     }
 
+    @Override
+    public Optional<String> getLiteTopic() {
+        return Optional.ofNullable(liteTopic);
+    }
+
     @Override
     public Optional<Long> getDeliveryTimestamp() {
         return Optional.ofNullable(deliveryTimestamp);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index f5a066b2..21fcc132 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -35,13 +35,14 @@ import 
org.apache.rocketmq.client.apis.message.MessageBuilder;
 public class MessageBuilderImpl implements MessageBuilder {
     public static final Pattern TOPIC_PATTERN = 
Pattern.compile("^[%a-zA-Z0-9_-]+$");
 
-    private String topic = null;
-    private byte[] body = null;
-    private String tag = null;
-    private String messageGroup = null;
-    private Long deliveryTimestamp = null;
-    private Collection<String> keys = new HashSet<>();
-    private final Map<String, String> properties = new HashMap<>();
+    protected String topic = null;
+    protected byte[] body = null;
+    protected String tag = null;
+    protected String messageGroup = null;
+    protected String liteTopic = null;
+    protected Long deliveryTimestamp = null;
+    protected Collection<String> keys = new HashSet<>();
+    protected final Map<String, String> properties = new HashMap<>();
 
     public MessageBuilderImpl() {
     }
@@ -98,17 +99,28 @@ public class MessageBuilderImpl implements MessageBuilder {
     @Override
     public MessageBuilder setMessageGroup(String messageGroup) {
         checkArgument(null == deliveryTimestamp, "messageGroup and 
deliveryTimestamp should not be set at same time");
+        checkArgument(null == liteTopic, "messageGroup and liteTopic should 
not be set at same time");
         checkArgument(StringUtils.isNotBlank(messageGroup), "messageGroup 
should not be blank");
         this.messageGroup = messageGroup;
         return this;
     }
 
+    @Override
+    public MessageBuilder setLiteTopic(String liteTopic) {
+        checkArgument(null == deliveryTimestamp, "liteTopic and 
deliveryTimestamp should not be set at same time");
+        checkArgument(null == messageGroup, "liteTopic and messageGroup should 
not be set at same time");
+        checkArgument(StringUtils.isNotBlank(liteTopic), "liteTopic should not 
be blank");
+        this.liteTopic = liteTopic;
+        return this;
+    }
+
     /**
      * See {@link MessageBuilder#setDeliveryTimestamp(long)}
      */
     @Override
     public MessageBuilder setDeliveryTimestamp(long deliveryTimestamp) {
         checkArgument(null == messageGroup, "deliveryTimestamp and 
messageGroup should not be set at same time");
+        checkArgument(null == liteTopic, "deliveryTimestamp and liteTopic 
should not be set at same time");
         this.deliveryTimestamp = deliveryTimestamp;
         return this;
     }
@@ -131,6 +143,6 @@ public class MessageBuilderImpl implements MessageBuilder {
     public Message build() {
         checkNotNull(topic, "topic has not been set yet");
         checkNotNull(body, "body has not been set yet");
-        return new MessageImpl(topic, body, tag, keys, messageGroup, 
deliveryTimestamp, properties);
+        return new MessageImpl(this);
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 581880d7..adbec6c6 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -46,22 +46,23 @@ public class MessageImpl implements Message {
     @Nullable
     private final String messageGroup;
     @Nullable
+    private final String liteTopic;
+    @Nullable
     private final Long deliveryTimestamp;
 
     /**
      * The caller is supposed to have validated the arguments and handled 
throwing exception or
      * logging warnings already, so we avoid repeating args check here.
      */
-    MessageImpl(String topic, byte[] body, @Nullable String tag, 
Collection<String> keys,
-        @Nullable String messageGroup, @Nullable Long deliveryTimestamp,
-        Map<String, String> properties) {
-        this.topic = topic;
-        this.body = body;
-        this.tag = tag;
-        this.messageGroup = messageGroup;
-        this.deliveryTimestamp = deliveryTimestamp;
-        this.keys = keys;
-        this.properties = properties;
+    MessageImpl(MessageBuilderImpl builder) {
+        this.topic = builder.topic;
+        this.body = builder.body;
+        this.tag = builder.tag;
+        this.messageGroup = builder.messageGroup;
+        this.liteTopic = builder.liteTopic;
+        this.deliveryTimestamp = builder.deliveryTimestamp;
+        this.keys = builder.keys;
+        this.properties = builder.properties;
     }
 
     MessageImpl(Message message) {
@@ -79,6 +80,7 @@ public class MessageImpl implements Message {
         this.tag = message.getTag().orElse(null);
         this.messageGroup = message.getMessageGroup().orElse(null);
         this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
+        this.liteTopic = message.getLiteTopic().orElse(null);
         this.keys = message.getKeys();
         this.properties = message.getProperties();
     }
@@ -139,6 +141,11 @@ public class MessageImpl implements Message {
         return Optional.ofNullable(messageGroup);
     }
 
+    @Override
+    public Optional<String> getLiteTopic() {
+        return Optional.ofNullable(liteTopic);
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
index ab38c87e..5748149e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.message;
 public enum MessageType {
     NORMAL,
     FIFO,
+    LITE,
     DELAY,
     TRANSACTION;
 
@@ -29,6 +30,8 @@ public enum MessageType {
                 return MessageType.NORMAL;
             case FIFO:
                 return MessageType.FIFO;
+            case LITE:
+                return MessageType.LITE;
             case DELAY:
                 return MessageType.DELAY;
             case TRANSACTION:
@@ -45,6 +48,8 @@ public enum MessageType {
                 return apache.rocketmq.v2.MessageType.NORMAL;
             case FIFO:
                 return apache.rocketmq.v2.MessageType.FIFO;
+            case LITE:
+                return apache.rocketmq.v2.MessageType.LITE;
             case DELAY:
                 return apache.rocketmq.v2.MessageType.DELAY;
             case TRANSACTION:
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index aa9db400..420e3d35 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -51,6 +51,7 @@ public class MessageViewImpl implements MessageView {
     private final String topic;
     private final String tag;
     private final String messageGroup;
+    private final String liteTopic;
     private final Long deliveryTimestamp;
     private final Collection<String> keys;
     private final Map<String, String> properties;
@@ -65,7 +66,8 @@ public class MessageViewImpl implements MessageView {
     private final long decodeTimestamp;
     private final Long transportDeliveryTimestamp;
 
-    public MessageViewImpl(MessageId messageId, String topic, byte[] body, 
String tag, String messageGroup,
+    public MessageViewImpl(MessageId messageId, String topic, byte[] body, 
String tag,
+        String messageGroup, String liteTopic,
         Long deliveryTimestamp, Collection<String> keys, Map<String, String> 
properties,
         String bornHost, long bornTimestamp, int deliveryAttempt, 
MessageQueueImpl messageQueue,
         String receiptHandle, long offset, boolean corrupted,
@@ -75,6 +77,7 @@ public class MessageViewImpl implements MessageView {
         this.body = checkNotNull(body, "body should not be null");
         this.tag = tag;
         this.messageGroup = messageGroup;
+        this.liteTopic = liteTopic;
         this.deliveryTimestamp = deliveryTimestamp;
         this.keys = checkNotNull(keys, "keys should not be null");
         this.properties = checkNotNull(properties, "properties should not be 
null");
@@ -146,6 +149,14 @@ public class MessageViewImpl implements MessageView {
         return Optional.ofNullable(messageGroup);
     }
 
+    /**
+     * @see MessageView#getLiteTopic()
+     */
+    @Override
+    public Optional<String> getLiteTopic() {
+        return Optional.ofNullable(liteTopic);
+    }
+
     /**
      * @see MessageView#getDeliveryTimestamp()
      */
@@ -289,6 +300,7 @@ public class MessageViewImpl implements MessageView {
 
         String tag = systemProperties.hasTag() ? systemProperties.getTag() : 
null;
         String messageGroup = systemProperties.hasMessageGroup() ? 
systemProperties.getMessageGroup() : null;
+        String liteTopic = systemProperties.hasLiteTopic() ? 
systemProperties.getLiteTopic() : null;
         Long deliveryTimestamp = systemProperties.hasDeliveryTimestamp() ?
             Timestamps.toMillis(systemProperties.getDeliveryTimestamp()) : 
null;
         final ProtocolStringList keys = systemProperties.getKeysList();
@@ -298,8 +310,9 @@ public class MessageViewImpl implements MessageView {
         final long offset = systemProperties.getQueueOffset();
         final Map<String, String> properties = message.getUserPropertiesMap();
         final String receiptHandle = systemProperties.getReceiptHandle();
-        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, 
deliveryTimestamp, keys, properties,
-            bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, 
offset, corrupted, transportDeliveryTimestamp);
+        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, 
liteTopic, deliveryTimestamp,
+            keys, properties, bornHost, bornTimestamp, deliveryAttempt,
+            mq, receiptHandle, offset, corrupted, transportDeliveryTimestamp);
     }
 
     @Override
@@ -314,6 +327,7 @@ public class MessageViewImpl implements MessageView {
             .add("tag", tag)
             .add("keys", keys)
             .add("messageGroup", messageGroup)
+            .add("liteTopic", liteTopic)
             .add("deliveryTimestamp", deliveryTimestamp)
             .add("properties", properties)
             .toString();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 6af6d737..96987a78 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -49,6 +49,7 @@ public class PublishingMessageImpl extends MessageImpl {
         this.messageId = MessageIdCodec.getInstance().nextMessageId();
         // Normal message.
         if (!message.getMessageGroup().isPresent() &&
+            !message.getLiteTopic().isPresent() &&
             !message.getDeliveryTimestamp().isPresent() && !txEnabled) {
             messageType = MessageType.NORMAL;
             return;
@@ -58,6 +59,11 @@ public class PublishingMessageImpl extends MessageImpl {
             messageType = MessageType.FIFO;
             return;
         }
+        // Lite message.
+        if (message.getLiteTopic().isPresent() && !txEnabled) {
+            messageType = MessageType.LITE;
+            return;
+        }
         // Delay message.
         if (message.getDeliveryTimestamp().isPresent() && !txEnabled) {
             messageType = MessageType.DELAY;
@@ -111,6 +117,7 @@ public class PublishingMessageImpl extends MessageImpl {
             .ifPresent(millis -> 
systemPropertiesBuilder.setDeliveryTimestamp(Timestamps.fromMillis(millis)));
         // Message group
         
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
+        this.getLiteTopic().ifPresent(systemPropertiesBuilder::setLiteTopic);
         final SystemProperties systemProperties = 
systemPropertiesBuilder.build();
         Resource topicResource = 
Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
         return apache.rocketmq.v2.Message.newBuilder()
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 4b734615..0f1ecb3e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -49,6 +49,11 @@ public class Endpoints {
     private final String facade;
     private final List<Address> addresses;
 
+    /**
+     * Cache the hash code for the object
+     */
+    private int hash; // Default to 0
+
     public Endpoints(apache.rocketmq.v2.Endpoints endpoints) {
         this.addresses = new ArrayList<>();
         for (apache.rocketmq.v2.Address address : 
endpoints.getAddressesList()) {
@@ -202,6 +207,9 @@ public class Endpoints {
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(scheme, facade, addresses);
+        if (hash == 0) {
+            hash = Objects.hashCode(scheme, facade, addresses);
+        }
+        return hash;
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index c555082f..98f252a5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -39,6 +39,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.Metadata;
@@ -200,6 +202,18 @@ public interface RpcClient {
     ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata,
         RecallMessageRequest request, Executor executor, Duration duration);
 
+    /**
+     * Sync lite subscription asynchronously.
+     *
+     * @param metadata gRPC request header metadata.
+     * @param request  sync lite subscription request
+     * @param executor gRPC asynchronous executor.
+     * @param duration request max duration.
+     * @return invocation of response future.
+     */
+    ListenableFuture<SyncLiteSubscriptionResponse> 
syncLiteSubscription(Metadata metadata,
+        SyncLiteSubscriptionRequest request, Executor executor, Duration 
duration);
+
     /**
      * Start a streaming request and get the request observer.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index f71dfb29..1e0225a1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -40,6 +40,8 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -228,6 +230,16 @@ public class RpcClientImpl implements RpcClient {
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).recallMessage(request);
     }
 
+    @Override
+    public ListenableFuture<SyncLiteSubscriptionResponse> 
syncLiteSubscription(Metadata metadata,
+        SyncLiteSubscriptionRequest request, Executor executor, Duration 
duration) {
+        this.activityNanoTime = System.nanoTime();
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
+            .withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS)
+            .syncLiteSubscription(request);
+    }
+
     @Override
     public StreamObserver<TelemetryCommand> telemetry(Metadata metadata, 
Executor executor, Duration duration,
         StreamObserver<TelemetryCommand> responseObserver) {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
index ca198d9d..cf3669ea 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import 
org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
@@ -39,6 +40,12 @@ public class ClientServiceProviderImplTest {
             
ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
     }
 
+    @Test
+    public void testNewLitePushConsumerBuilder() {
+        assertEquals(LitePushConsumerBuilderImpl.class,
+            
ClientServiceProvider.loadService().newLitePushConsumerBuilder().getClass());
+    }
+
     @Test
     public void testNewSimpleConsumerBuilder() {
         assertEquals(SimpleConsumerBuilderImpl.class,
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
index 13ec8e37..56605011 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
@@ -110,6 +110,17 @@ public class StatusCheckerTest extends TestBase {
             }
         }
 
+        {
+            Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_LITE_TOPIC).build();
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
+            try {
+                StatusChecker.check(status, invocation);
+                fail();
+            } catch (BadRequestException ignore) {
+                // ignore on purpose
+            }
+        }
+
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_TAG).build();
             RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
@@ -431,6 +442,22 @@ public class StatusCheckerTest extends TestBase {
         }
     }
 
+    @Test(expected = LiteTopicQuotaExceededException.class)
+    public void testLiteTopicQuotaExceeded() throws ClientException {
+        Status status = 
Status.newBuilder().setCode(Code.LITE_TOPIC_QUOTA_EXCEEDED).build();
+        final Context context = generateContext();
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
null);
+        StatusChecker.check(status, invocation);
+    }
+
+    @Test(expected = LiteSubscriptionQuotaExceededException.class)
+    public void testLiteSubscriptionQuotaExceeded() throws ClientException {
+        Status status = 
Status.newBuilder().setCode(Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED).build();
+        final Context context = generateContext();
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
null);
+        StatusChecker.check(status, invocation);
+    }
+
     @Test
     public void testRequestHeaderFieldsTooLarge() throws ClientException {
         Object response = new Object();
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index b6ac2ec3..a36c60c7 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -28,6 +28,7 @@ import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.RecallMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.SendMessageRequest;
+import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
 import io.grpc.Metadata;
 import java.time.Duration;
 import org.apache.rocketmq.client.java.misc.ClientId;
@@ -144,4 +145,13 @@ public class ClientManagerImplTest extends TestBase {
         CLIENT_MANAGER.recallMessage(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
+
+    @Test
+    public void testSyncLiteSubscription() {
+        SyncLiteSubscriptionRequest request = 
SyncLiteSubscriptionRequest.newBuilder().build();
+        CLIENT_MANAGER.syncLiteSubscription(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.syncLiteSubscription(null, request, 
Duration.ofSeconds(1));
+        // Expect no exception thrown.
+    }
+
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
index 9f7a58af..0b2e4bf3 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 
+import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
@@ -241,4 +242,27 @@ public class ClientSessionImplTest extends TestBase {
         Mockito.verify(requestObserver, times(1)).onCompleted();
         Mockito.verify(sessionHandler, times(1)).getScheduler();
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnNextWithNotifyUnsubscribeLiteCommand() throws 
ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        Mockito.when(sessionHandler.getScheduler()).thenReturn(new 
ScheduledThreadPoolExecutor(1));
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
+        Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
+        
Mockito.doNothing().when(sessionHandler).onNotifyUnsubscribeLiteCommand(any(Endpoints.class),
+            any(NotifyUnsubscribeLiteCommand.class));
+        NotifyUnsubscribeLiteCommand command0 = 
NotifyUnsubscribeLiteCommand.newBuilder()
+            .setLiteTopic("test-lite-topic")
+            .build();
+        TelemetryCommand command = TelemetryCommand.newBuilder()
+            .setNotifyUnsubscribeLiteCommand(command0).build();
+        clientSession.onNext(command);
+        Mockito.verify(sessionHandler, 
times(1)).onNotifyUnsubscribeLiteCommand(eq(endpoints), eq(command0));
+    }
+
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImplTest.java
new file mode 100644
index 00000000..fdb338cb
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImplTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+
+public class LitePushConsumerBuilderImplTest extends TestBase {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBindTopicWithNull() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.bindTopic(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBindTopicWithBlank() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.bindTopic("  ");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBindTopicWithEmpty() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.bindTopic("");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testSetClientConfigurationWithNull() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setClientConfiguration(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testSetConsumerGroupWithNull() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setConsumerGroup(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testSetMessageListenerWithNull() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setMessageListener(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetNegativeMaxCacheMessageCount() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setMaxCacheMessageCount(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetNegativeMaxCacheMessageSizeInBytes() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setMaxCacheMessageSizeInBytes(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetNegativeConsumptionThreadCount() {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setConsumptionThreadCount(-1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testBuildWithoutClientConfiguration() throws ClientException {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        builder.setConsumerGroup(FAKE_CONSUMER_GROUP_0)
+            .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+            .bindTopic("test-topic")
+            .build();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testBuildWithoutConsumerGroup() throws ClientException {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        ClientConfiguration clientConfiguration =
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        builder.setClientConfiguration(clientConfiguration)
+            .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+            .bindTopic("test-topic")
+            .build();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testBuildWithoutMessageListener() throws ClientException {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        ClientConfiguration clientConfiguration =
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        builder.setClientConfiguration(clientConfiguration)
+            .setConsumerGroup(FAKE_CONSUMER_GROUP_0)
+            .bindTopic("test-topic")
+            .build();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testBuildWithoutBindTopic() throws ClientException {
+        final LitePushConsumerBuilderImpl builder = new 
LitePushConsumerBuilderImpl();
+        ClientConfiguration clientConfiguration =
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        builder.setClientConfiguration(clientConfiguration)
+            .setConsumerGroup(FAKE_CONSUMER_GROUP_0)
+            .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+            .build();
+    }
+
+}
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
new file mode 100644
index 00000000..b71ab166
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImplTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import apache.rocketmq.v2.LiteSubscriptionAction;
+import com.google.common.util.concurrent.Futures;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import 
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededException;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class LitePushConsumerImplTest {
+
+    final String endpoints = "127.0.0.1:8080";
+
+    LitePushConsumerSettings spySettings;
+
+    private LitePushConsumerImpl consumer;
+
+    @Before
+    public void setUp() {
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(endpoints).build();
+
+        LitePushConsumerBuilderImpl litePushConsumerBuilder = new 
LitePushConsumerBuilderImpl();
+        litePushConsumerBuilder.setClientConfiguration(clientConfiguration);
+
+        LitePushConsumerSettings realSettings = new 
LitePushConsumerSettings(litePushConsumerBuilder, new ClientId(),
+            new Endpoints("127.0.0.1:8080"));
+
+        spySettings = Mockito.spy(realSettings);
+
+        MockitoAnnotations.openMocks(this);
+        consumer = mock(LitePushConsumerImpl.class, CALLS_REAL_METHODS);
+        // Set final field litePushConsumerSettings using reflection
+        try {
+            java.lang.reflect.Field field = 
LitePushConsumerImpl.class.getDeclaredField("litePushConsumerSettings");
+            field.setAccessible(true);
+            field.set(consumer, spySettings);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testSubscribeLiteNotRunning() throws ClientException {
+        String liteTopic = "testLiteTopic";
+        doThrow(new IllegalStateException("not 
running")).when(consumer).checkRunning();
+
+        consumer.subscribeLite(liteTopic);
+    }
+
+    @Test
+    public void testSubscribeLiteAlreadySubscribed() throws ClientException {
+        String liteTopic = "testLiteTopic";
+        doNothing().when(consumer).checkRunning();
+        when(spySettings.containsLiteTopic(liteTopic)).thenReturn(true);
+
+        consumer.subscribeLite(liteTopic);
+
+        verify(consumer).checkRunning();
+        verify(spySettings).containsLiteTopic(liteTopic);
+        verify(consumer, never()).syncLiteSubscription(any(), any());
+    }
+
+    @Test
+    public void 
testSubscribeLiteQuotaExceededThenUnsubscribeAndSubscribeAgain() throws 
ClientException {
+        String liteTopic1 = "testLiteTopic1";
+        String liteTopic2 = "testLiteTopic2";
+        doNothing().when(consumer).checkRunning();
+        doReturn(Futures.immediateVoidFuture()).when(consumer)
+            .syncLiteSubscription(any(LiteSubscriptionAction.class), 
anyCollection());
+        when(spySettings.getLiteSubscriptionQuota()).thenReturn(1);
+
+        consumer.subscribeLite(liteTopic1);
+        assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(1);
+
+        assertThatThrownBy(() -> consumer.subscribeLite(liteTopic2))
+            .isInstanceOf(LiteSubscriptionQuotaExceededException.class);
+        assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(1);
+
+        consumer.unsubscribeLite(liteTopic1);
+        assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(0);
+
+        consumer.subscribeLite(liteTopic2);
+        assertThat(spySettings.getLiteTopicSetSize()).isEqualTo(1);
+
+        verify(spySettings, times(1)).addLiteTopic(liteTopic1);
+        verify(spySettings, times(1)).removeLiteTopic(liteTopic1);
+        verify(spySettings, times(1)).addLiteTopic(liteTopic2);
+    }
+}
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index d9aa61fb..e3ecfe46 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -92,7 +92,7 @@ public class ProcessQueueImplTest extends TestBase {
         field1.setAccessible(true);
         field1.set(pushConsumer, consumptionErrorQuantity);
 
-        
when(pushConsumer.getPushConsumerSettings()).thenReturn(pushSubscriptionSettings);
+        when(pushConsumer.getSettings()).thenReturn(pushSubscriptionSettings);
         when(pushConsumer.getScheduler()).thenReturn(SCHEDULER);
 
         AtomicLong receivedMessagesQuantity = new AtomicLong(0);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index a771fc25..3ad28f95 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -29,9 +29,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Assert;
@@ -41,13 +41,19 @@ public class PushSubscriptionSettingsTest extends TestBase {
 
     @Test
     public void testToProtobuf() {
-        Resource groupResource = new Resource(FAKE_NAMESPACE, 
FAKE_CONSUMER_GROUP_0);
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setNamespace(FAKE_NAMESPACE)
+            .setRequestTimeout(requestTimeout)
+            .setEndpoints(FAKE_ENDPOINTS)
+            .build();
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
-        final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
-            fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(
+            clientConfiguration, clientId,
+            org.apache.rocketmq.client.java.impl.ClientType.PUSH_CONSUMER,
+            fakeEndpoints(), FAKE_CONSUMER_GROUP_0, subscriptionExpression);
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
@@ -72,15 +78,20 @@ public class PushSubscriptionSettingsTest extends TestBase {
 
     @Test
     public void testToProtobufWithSqlExpression() {
-        Resource groupResource = new Resource(FAKE_NAMESPACE, 
FAKE_CONSUMER_GROUP_0);
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setNamespace(FAKE_NAMESPACE)
+            .setRequestTimeout(requestTimeout)
+            .setEndpoints(FAKE_ENDPOINTS)
+            .build();
         ClientId clientId = new ClientId();
-
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
-        final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
-            fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(
+            clientConfiguration, clientId,
+            org.apache.rocketmq.client.java.impl.ClientType.PUSH_CONSUMER,
+            fakeEndpoints(), FAKE_CONSUMER_GROUP_0, subscriptionExpression);
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
@@ -121,14 +132,20 @@ public class PushSubscriptionSettingsTest extends 
TestBase {
         Subscription subscription = 
Subscription.newBuilder().setFifo(fifo).setReceiveBatchSize(receiveBatchSize)
             .setLongPollingTimeout(longPollingTimeout).build();
         Settings settings = 
Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
-            fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setNamespace(FAKE_NAMESPACE)
+            .setRequestTimeout(requestTimeout)
+            .setEndpoints(FAKE_ENDPOINTS)
+            .build();
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(
+            clientConfiguration, clientId,
+            org.apache.rocketmq.client.java.impl.ClientType.PUSH_CONSUMER,
+            fakeEndpoints(), FAKE_CONSUMER_GROUP_0, subscriptionExpression);
         pushSubscriptionSettings.sync(settings);
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
index 2f1de06c..23f445c7 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
@@ -27,36 +27,146 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 
 public class GeneralMessageImplTest extends TestBase {
+
     @Test
-    public void testMessage() {
+    public void testMessageTagKeysProperty() {
         String topic = "testTopic";
         byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
         String tag = "tagA";
+        String key1 = "keyA";
+        String key2 = "keyB";
+        String propertyKey1 = "propertyKey1";
+        String propertyValue1 = "propertyValue1";
+        String propertyKey2 = "propertyKey2";
+        String propertyValue2 = "propertyValue2";
+
         List<String> keys = new ArrayList<>();
-        keys.add("keyA");
-        String messageGroup = "messageGroup0";
-        long deliveryTimestamp = System.currentTimeMillis();
+        keys.add(key1);
+        keys.add(key2);
+
         Map<String, String> properties = new HashMap<>();
-        properties.put("propertyA", "valueA");
+        properties.put(propertyKey1, propertyValue1);
+        properties.put(propertyKey2, propertyValue2);
+
+        final Message message = new MessageBuilderImpl()
+            .setTopic(topic)
+            .setBody(body)
+            .setTag(tag)
+            .setKeys(key1, key2)
+            .addProperty(propertyKey1, propertyValue1)
+            .addProperty(propertyKey2, propertyValue2)
+            .build();
 
-        final MessageImpl message = new MessageImpl(topic, body, tag, keys, 
messageGroup, deliveryTimestamp,
-            properties);
         final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(message);
         assertFalse(generalMessage.getMessageId().isPresent());
         assertEquals(topic, generalMessage.getTopic());
         assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
-        assertEquals(properties, generalMessage.getProperties());
         assertTrue(generalMessage.getTag().isPresent());
         assertEquals(tag, generalMessage.getTag().get());
         assertEquals(keys, generalMessage.getKeys());
+        assertFalse(generalMessage.getBornHost().isPresent());
+        assertFalse(generalMessage.getBornTimestamp().isPresent());
+        assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+        assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+        
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+        assertFalse(generalMessage.getMessageGroup().isPresent());
+        assertFalse(generalMessage.getLiteTopic().isPresent());
+        assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+
+        // Verify properties
+        Map<String, String> messageProperties = generalMessage.getProperties();
+        assertEquals(properties.size(), messageProperties.size());
+        assertEquals(properties, messageProperties);
+    }
+
+    @Test
+    public void testMessageGroup() {
+        String topic = "testTopic";
+        byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+        String messageGroup = "messageGroup0";
+
+        final Message message = new MessageBuilderImpl()
+            .setTopic(topic)
+            .setBody(body)
+            .setMessageGroup(messageGroup)
+            .build();
+
+        final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(message);
+        assertFalse(generalMessage.getMessageId().isPresent());
+        assertEquals(topic, generalMessage.getTopic());
+        assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+        assertFalse(generalMessage.getTag().isPresent());
+        assertEquals(0, generalMessage.getKeys().size());
         assertTrue(generalMessage.getMessageGroup().isPresent());
         assertEquals(messageGroup, generalMessage.getMessageGroup().get());
+        assertFalse(generalMessage.getBornHost().isPresent());
+        assertFalse(generalMessage.getBornTimestamp().isPresent());
+        assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+        assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+        
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+        assertFalse(generalMessage.getLiteTopic().isPresent());
+        assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+    }
+
+    @Test
+    public void testMessageLiteTopic() {
+        String topic = "testTopic";
+        byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+        String liteTopic = "liteTopic0";
+
+        final Message message = new MessageBuilderImpl()
+            .setTopic(topic)
+            .setBody(body)
+            .setLiteTopic(liteTopic)
+            .build();
+
+        final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(message);
+        assertFalse(generalMessage.getMessageId().isPresent());
+        assertEquals(topic, generalMessage.getTopic());
+        assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+        assertFalse(generalMessage.getTag().isPresent());
+        assertEquals(0, generalMessage.getKeys().size());
+        assertFalse(generalMessage.getMessageGroup().isPresent());
+        assertTrue(generalMessage.getLiteTopic().isPresent());
+        assertEquals(liteTopic, generalMessage.getLiteTopic().get());
+        assertFalse(generalMessage.getBornHost().isPresent());
+        assertFalse(generalMessage.getBornTimestamp().isPresent());
+        assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+        assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+        
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+        assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+    }
+
+    @Test
+    public void testMessageDeliveryTimestamp() {
+        String topic = "testTopic";
+        byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+        long deliveryTimestamp = System.currentTimeMillis();
+
+        final Message message = new MessageBuilderImpl()
+            .setTopic(topic)
+            .setBody(body)
+            .setDeliveryTimestamp(deliveryTimestamp)
+            .build();
+
+        final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(message);
+        assertFalse(generalMessage.getMessageId().isPresent());
+        assertEquals(topic, generalMessage.getTopic());
+        assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+        assertFalse(generalMessage.getTag().isPresent());
+        assertEquals(0, generalMessage.getKeys().size());
+        assertFalse(generalMessage.getMessageGroup().isPresent());
+        assertFalse(generalMessage.getLiteTopic().isPresent());
         assertTrue(generalMessage.getDeliveryTimestamp().isPresent());
         assertEquals(deliveryTimestamp, (long) 
generalMessage.getDeliveryTimestamp().get());
         assertFalse(generalMessage.getBornHost().isPresent());
@@ -73,6 +183,7 @@ public class GeneralMessageImplTest extends TestBase {
         byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
         String tag = "tagA";
         String messageGroup = "messageGroup0";
+        String liteTopic = "liteTopic0";
         long deliveryTimestamp = System.currentTimeMillis();
         List<String> keys = new ArrayList<>();
         keys.add("keyA");
@@ -87,7 +198,8 @@ public class GeneralMessageImplTest extends TestBase {
         boolean corrupted = false;
         long transportDeliveryTimestamp = System.currentTimeMillis();
 
-        final MessageViewImpl messageView = new MessageViewImpl(messageId, 
topic, body, tag, messageGroup,
+        final MessageViewImpl messageView = new MessageViewImpl(messageId, 
topic, body, tag,
+            messageGroup, liteTopic,
             deliveryTimestamp, keys, properties, bornHost, bornTimestamp, 
deliveryAttempt, mq, receiptHandle,
             offset, corrupted, transportDeliveryTimestamp);
         final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(messageView);
@@ -101,6 +213,8 @@ public class GeneralMessageImplTest extends TestBase {
         assertEquals(keys, generalMessage.getKeys());
         assertTrue(generalMessage.getMessageGroup().isPresent());
         assertEquals(messageGroup, generalMessage.getMessageGroup().get());
+        assertTrue(generalMessage.getLiteTopic().isPresent());
+        assertEquals(liteTopic, generalMessage.getLiteTopic().get());
         assertTrue(generalMessage.getDeliveryTimestamp().isPresent());
         assertEquals(deliveryTimestamp, (long) 
generalMessage.getDeliveryTimestamp().get());
         assertTrue(generalMessage.getBornHost().isPresent());
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index a9a918fb..88797670 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -131,4 +131,21 @@ public class MessageImplTest extends TestBase {
         assertFalse(message.getDeliveryTimestamp().isPresent());
         assertFalse(message.getMessageGroup().isPresent());
     }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLiteTopicSetterWithSpaces() {
+        provider.newMessageBuilder().setLiteTopic("  ");
+    }
+
+    @Test
+    public void testLiteTopicSetter() {
+        final Message message = provider.newMessageBuilder()
+            .setLiteTopic("liteTopicA")
+            .setTopic(FAKE_TOPIC_0)
+            .setBody(FAKE_MESSAGE_BODY)
+            .build();
+        assertTrue(message.getLiteTopic().isPresent());
+        assertEquals("liteTopicA", message.getLiteTopic().get());
+    }
+
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 7f80f155..1d7e7581 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -183,7 +183,7 @@ public class TestBase {
         final byte[] body = RandomUtils.nextBytes(bodySize);
         Map<String, String> properties = new HashMap<>();
         List<String> keys = new ArrayList<>();
-        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, 
null,
+        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, 
null, null,
             keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, 1, 
corrupted,
             System.currentTimeMillis());
     }
diff --git a/java/pom.xml b/java/pom.xml
index 7e9aac66..2ac39dc2 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -26,7 +26,7 @@
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client-java-parent</artifactId>
     <packaging>pom</packaging>
-    <version>5.0.9-SNAPSHOT</version>
+    <version>5.1.0-SNAPSHOT</version>
     <modules>
         <module>client-apis</module>
         <module>client</module>
@@ -48,7 +48,7 @@
            ~  1. Whether it is essential, because the current shaded jar is 
fat enough.
            ~  2. Make sure that it is compatible with Java 8.
          -->
-        <rocketmq-proto.version>2.0.5</rocketmq-proto.version>
+        <rocketmq-proto.version>2.1.0</rocketmq-proto.version>
         <annotations-api.version>1.3.5</annotations-api.version>
         <protobuf.version>3.24.4</protobuf.version>
         <grpc.version>1.50.0</grpc.version>
diff --git a/java/test/pom.xml b/java/test/pom.xml
index e834b6fb..78bff647 100644
--- a/java/test/pom.xml
+++ b/java/test/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-client-java-parent</artifactId>
-        <version>5.0.9-SNAPSHOT</version>
+        <version>5.1.0-SNAPSHOT</version>
     </parent>
 
     <artifactId>test</artifactId>


Reply via email to