This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a25089  Add more unit tests (#148)
1a25089 is described below

commit 1a25089115669dfa9ce573302d17975a39cfccf3
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Aug 11 15:43:47 2022 +0800

    Add more unit tests (#148)
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  49 ++++----
 .../client/java/impl/ClientManagerImpl.java        |  47 ++++----
 .../client/java/impl/ClientSessionImpl.java        |  27 ++---
 .../impl/{ClientSettings.java => Settings.java}    |  20 +---
 .../client/java/impl/consumer/ConsumerImpl.java    |  26 ++---
 .../java/impl/consumer/ProcessQueueImpl.java       |   2 +-
 .../java/impl/consumer/PushConsumerImpl.java       |  26 ++---
 ...Settings.java => PushSubscriptionSettings.java} |  29 +++--
 .../java/impl/consumer/SimpleConsumerImpl.java     |  12 +-
 ...ttings.java => SimpleSubscriptionSettings.java} |  34 +++---
 .../java/impl/producer/ClientSessionHandler.java   |   6 -
 .../client/java/impl/producer/ProducerImpl.java    |  27 +++--
 ...oducerSettings.java => PublishingSettings.java} |  21 ++--
 .../client/java/impl/producer/TransactionImpl.java |  10 +-
 .../client/java/message/PublishingMessageImpl.java |   6 +-
 .../client/java/message/protocol/Resource.java     |  12 +-
 .../rocketmq/client/java/misc/Utilities.java       |  14 ++-
 .../client/java/impl/ClientManagerImplTest.java    |   5 +-
 .../java/impl/consumer/ConsumeServiceTest.java     | 114 +++++++++++++++++++
 .../client/java/impl/consumer/ConsumeTaskTest.java |  55 +++++++++
 .../java/impl/consumer/ConsumerImplTest.java       | 123 +++++++++++++++++++++
 ...erviceTest.java => FifoConsumeServiceTest.java} |  36 +++---
 .../java/impl/consumer/ProcessQueueImplTest.java   |  14 +--
 .../impl/consumer/PushConsumerBuilderImplTest.java |   4 +-
 .../java/impl/consumer/PushConsumerImplTest.java   |   4 +-
 .../consumer/PushSubscriptionSettingsTest.java     | 120 ++++++++++++++++++++
 .../impl/consumer/SimpleConsumerBuilderTest.java   |   4 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |  20 ++--
 .../consumer/SimpleSubscriptionSettingsTest.java   |  94 ++++++++++++++++
 .../impl/consumer/StandardConsumeServiceTest.java  |  10 +-
 .../impl/producer/ProducerBuilderImplTest.java     |  35 ++++++
 .../java/impl/producer/ProducerImplTest.java       |   4 +-
 .../java/impl/producer/TransactionImplTest.java    |  93 ++++++++++++++++
 .../protocol/EncodingTest.java}                    |  34 +++---
 .../client/java/message/protocol/ResourceTest.java |  54 +++++++++
 .../rocketmq/client/java/misc/UtilitiesTest.java   |  64 +++++++++++
 .../apache/rocketmq/client/java/tool/TestBase.java |  10 +-
 java/style/spotbugs-suppressions.xml               |   8 +-
 38 files changed, 1003 insertions(+), 270 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 0a3257a..e7e6fd6 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -29,7 +29,6 @@ import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Status;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.ThreadStackTrace;
@@ -100,7 +99,6 @@ public abstract class ClientImpl extends AbstractIdleService 
implements Client,
      */
     private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(60 * 
365);
 
-    protected final ClientManager clientManager;
     protected final ClientConfiguration clientConfiguration;
     protected final Endpoints endpoints;
     protected final Set<String> topics;
@@ -114,6 +112,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     protected final ThreadPoolExecutor telemetryCommandExecutor;
     protected final String clientId;
 
+    private final ClientManager clientManager;
     private volatile ScheduledFuture<?> updateRouteCacheFuture;
     private final ConcurrentMap<String, TopicRouteData> topicRouteCache;
 
@@ -276,7 +275,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
 
     @Override
     public TelemetryCommand settingsCommand() {
-        final Settings settings = this.getClientSettings().toProtobuf();
+        final apache.rocketmq.v2.Settings settings = 
this.getSettings().toProtobuf();
         return TelemetryCommand.newBuilder().setSettings(settings).build();
     }
 
@@ -299,12 +298,6 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         return !totalRouteEndpoints.contains(endpoints);
     }
 
-    @Override
-    public ListenableFuture<Void> awaitSettingSynchronized() {
-        return Futures.transformAsync(this.getClientSettings().arrivedFuture,
-            (clientSettings) -> Futures.immediateVoidFuture(), 
clientCallbackExecutor);
-    }
-
     /**
      * This method is invoked while request of printing thread stack trace is 
received from remote.
      *
@@ -338,7 +331,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }
     }
 
-    public abstract ClientSettings getClientSettings();
+    public abstract Settings getSettings();
 
     /**
      * Apply setting from remote.
@@ -347,10 +340,10 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
      * @param settings  settings received from remote.
      */
     @Override
-    public final void onSettingsCommand(Endpoints endpoints, Settings 
settings) {
+    public final void onSettingsCommand(Endpoints endpoints, 
apache.rocketmq.v2.Settings settings) {
         final Metric metric = new Metric(settings.getMetric());
         clientMeterProvider.reset(metric);
-        this.getClientSettings().applySettingsCommand(settings);
+        this.getSettings().sync(settings);
     }
 
     /**
@@ -358,7 +351,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
      */
     @Override
     public void syncSettings() {
-        final Settings settings = getClientSettings().toProtobuf();
+        final apache.rocketmq.v2.Settings settings = 
getSettings().toProtobuf();
         final TelemetryCommand command = 
TelemetryCommand.newBuilder().setSettings(settings).build();
         final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
         for (Endpoints endpoints : totalRouteEndpoints) {
@@ -373,7 +366,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     public void telemetry(Endpoints endpoints, TelemetryCommand command) {
         try {
             final ClientSessionImpl clientSession = 
getClientSession(endpoints);
-            clientSession.fireWrite(command);
+            clientSession.write(command);
         } catch (Throwable t) {
             LOGGER.error("Failed to fire write telemetry command, clientId={}, 
endpoints={}", clientId, endpoints, t);
         }
@@ -412,15 +405,6 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }
     }
 
-    private ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
-        try {
-            final ClientSessionImpl clientSession = 
getClientSession(endpoints);
-            return clientSession.syncSettingsSafely();
-        } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
-        }
-    }
-
     /**
      * Triggered when {@link TopicRouteData} is fetched from remote.
      *
@@ -432,15 +416,18 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
             .map(mq -> mq.getBroker().getEndpoints())
             .collect(Collectors.toSet());
         final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
-        final Set<Endpoints> newEndpoints = new 
HashSet<>(Sets.difference(routeEndpoints,
-            existRouteEndpoints));
-        final List<ListenableFuture<Void>> futures =
-            
newEndpoints.stream().map(this::syncSettingsSafely).collect(Collectors.toList());
-        return Futures.whenAllSucceed(futures).callAsync(() -> {
+        final Set<Endpoints> newEndpoints = new 
HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
+        try {
+            for (Endpoints endpoints : newEndpoints) {
+                final ClientSessionImpl clientSession = 
getClientSession(endpoints);
+                clientSession.syncSettings();
+            }
             topicRouteCache.put(topic, topicRouteData);
             onTopicRouteDataUpdate0(topic, topicRouteData);
             return Futures.immediateFuture(topicRouteData);
-        }, clientCallbackExecutor);
+        } catch (Throwable t) {
+            return Futures.immediateFailedFuture(t);
+        }
     }
 
     public void onTopicRouteDataUpdate0(String topic, TopicRouteData 
topicRouteData) {
@@ -523,6 +510,10 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }
     }
 
+    public ClientManager getClientManager() {
+        return clientManager;
+    }
+
     /**
      * @see Client#clientId()
      */
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index fc3e0eb..e9bc30a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
+import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
@@ -79,7 +80,7 @@ public class ClientManagerImpl extends ClientManager {
     public static final Duration HEART_BEAT_INITIAL_DELAY = 
Duration.ofSeconds(1);
     public static final Duration HEART_BEAT_PERIOD = Duration.ofSeconds(10);
 
-    public static final Duration LOG_STATS_INITIAL_DELAY = 
Duration.ofSeconds(60);
+    public static final Duration LOG_STATS_INITIAL_DELAY = 
Duration.ofSeconds(1);
     public static final Duration LOG_STATS_PERIOD = Duration.ofSeconds(60);
 
     public static final Duration SYNC_SETTINGS_DELAY = Duration.ofSeconds(1);
@@ -88,6 +89,7 @@ public class ClientManagerImpl extends ClientManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientManagerImpl.class);
 
     private final Client client;
+
     @GuardedBy("rpcClientTableLock")
     private final Map<Endpoints, RpcClient> rpcClientTable;
     private final ReadWriteLock rpcClientTableLock;
@@ -132,14 +134,15 @@ public class ClientManagerImpl extends ClientManager {
             while (it.hasNext()) {
                 final Map.Entry<Endpoints, RpcClient> entry = it.next();
                 final Endpoints endpoints = entry.getKey();
-                final RpcClient client = entry.getValue();
+                final RpcClient rpcClient = entry.getValue();
 
-                final Duration idleDuration = client.idleDuration();
+                final Duration idleDuration = rpcClient.idleDuration();
                 if (idleDuration.compareTo(RPC_CLIENT_MAX_IDLE_DURATION) > 0) {
                     it.remove();
-                    client.shutdown();
+                    rpcClient.shutdown();
                     LOGGER.info("Rpc client has been idle for a long time, 
endpoints={}, idleDuration={}, " +
-                        "rpcClientMaxIdleDuration={}", endpoints, 
idleDuration, RPC_CLIENT_MAX_IDLE_DURATION);
+                            "rpcClientMaxIdleDuration={}, clientId={}", 
endpoints, idleDuration,
+                        RPC_CLIENT_MAX_IDLE_DURATION, client.clientId());
                 }
             }
         } finally {
@@ -176,7 +179,7 @@ public class ClientManagerImpl extends ClientManager {
             try {
                 rpcClient = new RpcClientImpl(endpoints);
             } catch (SSLException e) {
-                LOGGER.error("Failed to get rpc client, endpoints={}", 
endpoints);
+                LOGGER.error("Failed to get RPC client, endpoints={}, 
clientId={}", endpoints, client.clientId(), e);
                 throw new ClientException("Failed to generate RPC client", e);
             }
             rpcClientTable.put(endpoints, rpcClient);
@@ -310,13 +313,14 @@ public class ClientManagerImpl extends ClientManager {
 
     @Override
     protected void startUp() {
-        LOGGER.info("Begin to start the client manager");
+        final String clientId = client.clientId();
+        LOGGER.info("Begin to start the client manager, clientId={}", 
clientId);
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
                     clearIdleRpcClients();
                 } catch (Throwable t) {
-                    LOGGER.error("Exception raised while clear idle rpc 
clients.", t);
+                    LOGGER.error("Exception raised during the clearing of idle 
rpc clients, clientId={}", clientId, t);
                 }
             },
             RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY.toNanos(),
@@ -329,7 +333,7 @@ public class ClientManagerImpl extends ClientManager {
                 try {
                     client.doHeartbeat();
                 } catch (Throwable t) {
-                    LOGGER.error("Exception raised while heartbeat.", t);
+                    LOGGER.error("Exception raised during heartbeat, 
clientId={}", clientId, t);
                 }
             },
             HEART_BEAT_INITIAL_DELAY.toNanos(),
@@ -340,9 +344,11 @@ public class ClientManagerImpl extends ClientManager {
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
+                    LOGGER.info("Start to log statistics, clientVersion={}, 
clientWrapperVersion={}, clientId={}",
+                        MetadataUtils.getVersion(), 
MetadataUtils.getWrapperVersion(), clientId);
                     client.doStats();
                 } catch (Throwable t) {
-                    LOGGER.error("Exception raised while log stats.", t);
+                    LOGGER.error("Exception raised during statistics logging, 
clientId={}", clientId, t);
                 }
             },
             LOG_STATS_INITIAL_DELAY.toNanos(),
@@ -355,25 +361,26 @@ public class ClientManagerImpl extends ClientManager {
                 try {
                     client.syncSettings();
                 } catch (Throwable t) {
-                    LOGGER.error("Exception raised during the setting 
synchronizing.", t);
+                    LOGGER.error("Exception raised during the setting 
synchronization, clientId={}", clientId, t);
                 }
             },
             SYNC_SETTINGS_DELAY.toNanos(),
             SYNC_SETTINGS_PERIOD.toNanos(),
             TimeUnit.NANOSECONDS
         );
-        LOGGER.info("The client manager starts successfully");
+        LOGGER.info("The client manager starts successfully, clientId={}", 
clientId);
     }
 
     @Override
     protected void shutDown() throws IOException {
-        LOGGER.info("Begin to shutdown the client manager");
+        final String clientId = client.clientId();
+        LOGGER.info("Begin to shutdown the client manager, clientId={}", 
clientId);
         scheduler.shutdown();
         try {
             if (!ExecutorServices.awaitTerminated(scheduler)) {
-                LOGGER.error("[Bug] Timeout to shutdown the client scheduler");
+                LOGGER.error("[Bug] Timeout to shutdown the client scheduler, 
clientId={}", clientId);
             } else {
-                LOGGER.info("Shutdown the client scheduler successfully");
+                LOGGER.info("Shutdown the client scheduler successfully, 
clientId={}", clientId);
             }
             rpcClientTableLock.writeLock().lock();
             try {
@@ -387,17 +394,17 @@ public class ClientManagerImpl extends ClientManager {
             } finally {
                 rpcClientTableLock.writeLock().unlock();
             }
-            LOGGER.info("Shutdown all rpc client(s) successfully");
+            LOGGER.info("Shutdown all rpc client(s) successfully, 
clientId={}", clientId);
             asyncWorker.shutdown();
             if (!ExecutorServices.awaitTerminated(asyncWorker)) {
-                LOGGER.error("[Bug] Timeout to shutdown the client async 
worker");
+                LOGGER.error("[Bug] Timeout to shutdown the client async 
worker, clientId={}", clientId);
             } else {
-                LOGGER.info("Shutdown the client async worker successfully");
+                LOGGER.info("Shutdown the client async worker successfully, 
clientId={}", clientId);
             }
         } catch (InterruptedException e) {
-            LOGGER.error("[Bug] Unexpected exception raised while shutdown 
client manager", e);
+            LOGGER.error("[Bug] Unexpected exception raised while shutdown 
client manager, clientId={}", clientId, e);
             throw new IOException(e);
         }
-        LOGGER.info("Shutdown the client manager successfully");
+        LOGGER.info("Shutdown the client manager successfully, clientId={}", 
clientId);
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index b699ab0..f2edc70 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -22,11 +22,12 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -39,14 +40,17 @@ import org.slf4j.LoggerFactory;
 public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientSessionImpl.class);
     private static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY = 
Duration.ofSeconds(1);
+    private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
 
     private final ClientSessionHandler sessionHandler;
     private final Endpoints endpoints;
+    private final SettableFuture<Settings> settingsSettableFuture;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
     protected ClientSessionImpl(ClientSessionHandler sessionHandler, Endpoints 
endpoints) throws ClientException {
         this.sessionHandler = sessionHandler;
         this.endpoints = endpoints;
+        this.settingsSettableFuture = SettableFuture.create();
         this.requestObserver = sessionHandler.telemetry(endpoints, this);
     }
 
@@ -64,21 +68,17 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                 REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(), 
TimeUnit.NANOSECONDS);
             return;
         }
-        syncSettings();
+        syncSettings0();
     }
 
-    protected ListenableFuture<Void> syncSettingsSafely() {
-        try {
-            this.syncSettings();
-            return sessionHandler.awaitSettingSynchronized();
-        } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
-        }
+    protected void syncSettings() throws TimeoutException, ExecutionException, 
InterruptedException {
+        this.syncSettings0();
+        settingsSettableFuture.get(SETTINGS_INITIALIZATION_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
     }
 
-    private void syncSettings() {
+    private void syncSettings0() {
         final TelemetryCommand settings = sessionHandler.settingsCommand();
-        fireWrite(settings);
+        write(settings);
     }
 
     /**
@@ -95,7 +95,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
         }
     }
 
-    void fireWrite(TelemetryCommand command) {
+    void write(TelemetryCommand command) {
         if (null == requestObserver) {
             LOGGER.error("Request observer does not exist, ignore current 
command, endpoints={}, command={}",
                 endpoints, command);
@@ -113,6 +113,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                     final Settings settings = command.getSettings();
                     LOGGER.info("Receive settings from remote, endpoints={}, 
clientId={}", endpoints, clientId);
                     sessionHandler.onSettingsCommand(endpoints, settings);
+                    settingsSettableFuture.set(settings);
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
 b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
similarity index 73%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index bc50c4f..e18c82c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -17,47 +17,39 @@
 
 package org.apache.rocketmq.client.java.impl;
 
-import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.SettableFuture;
 import java.time.Duration;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
-public abstract class ClientSettings {
+public abstract class Settings {
     protected final String clientId;
     protected final ClientType clientType;
     protected final Endpoints accessPoint;
     protected volatile RetryPolicy retryPolicy;
     protected final Duration requestTimeout;
-    protected final SettableFuture<Void> arrivedFuture;
 
-    public ClientSettings(String clientId, ClientType clientType, Endpoints 
accessPoint,
-        RetryPolicy retryPolicy, Duration requestTimeout) {
+    public Settings(String clientId, ClientType clientType, Endpoints 
accessPoint, RetryPolicy retryPolicy,
+        Duration requestTimeout) {
         this.clientId = clientId;
         this.clientType = clientType;
         this.accessPoint = accessPoint;
         this.retryPolicy = retryPolicy;
         this.requestTimeout = requestTimeout;
-        this.arrivedFuture = SettableFuture.create();
     }
 
-    public ClientSettings(String clientId, ClientType clientType, Endpoints 
accessPoint, Duration requestTimeout) {
+    public Settings(String clientId, ClientType clientType, Endpoints 
accessPoint, Duration requestTimeout) {
         this(clientId, clientType, accessPoint, null, requestTimeout);
     }
 
-    public abstract Settings toProtobuf();
+    public abstract apache.rocketmq.v2.Settings toProtobuf();
 
-    public abstract void applySettingsCommand(Settings settings);
+    public abstract void sync(apache.rocketmq.v2.Settings settings);
 
     public RetryPolicy getRetryPolicy() {
         return retryPolicy;
     }
 
-    public SettableFuture<Void> getArrivedFuture() {
-        return arrivedFuture;
-    }
-
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 791a8f2..58b19b5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -36,7 +36,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Durations;
 import io.grpc.Metadata;
@@ -83,7 +82,7 @@ abstract class ConsumerImpl extends ClientImpl {
             final Duration tolerance = clientConfiguration.getRequestTimeout();
             final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ tolerance.toNanos());
             final 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
-                clientManager.receiveMessage(endpoints, metadata, request, 
timeout);
+                this.getClientManager().receiveMessage(endpoints, metadata, 
request, timeout);
             return Futures.transformAsync(future, invocation -> {
                 final Iterator<ReceiveMessageResponse> it = 
invocation.getResponse();
                 Status status = 
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
@@ -151,11 +150,10 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             final AckMessageRequest request = 
wrapAckMessageRequest(messageView);
             final Metadata metadata = sign();
-            future = clientManager.ackMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
+            final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
+            future = this.getClientManager().ackMessage(endpoints, metadata, 
request, requestTimeout);
         } catch (Throwable t) {
-            final SettableFuture<RpcInvocation<AckMessageResponse>> future0 = 
SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            future = Futures.immediateFailedFuture(t);
         }
         Futures.addCallback(future, new 
FutureCallback<RpcInvocation<AckMessageResponse>>() {
             @Override
@@ -178,7 +176,7 @@ abstract class ConsumerImpl extends ClientImpl {
         return future;
     }
 
-    public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
+    ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
         MessageViewImpl messageView, Duration invisibleDuration) {
         final Endpoints endpoints = messageView.getEndpoints();
         ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future;
@@ -187,14 +185,12 @@ abstract class ConsumerImpl extends ClientImpl {
         final List<MessageCommon> messageCommons = 
Collections.singletonList(messageView.getMessageCommon());
         doBefore(MessageHookPoints.CHANGE_INVISIBLE_DURATION, messageCommons);
         try {
-            final ChangeInvisibleDurationRequest request = 
wrapChangeInvisibleDuration(messageView, invisibleDuration);
             final Metadata metadata = sign();
-            future = clientManager.changeInvisibleDuration(endpoints, 
metadata, request,
-                clientConfiguration.getRequestTimeout());
+            final ChangeInvisibleDurationRequest request = 
wrapChangeInvisibleDuration(messageView, invisibleDuration);
+            final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
+            future = 
this.getClientManager().changeInvisibleDuration(endpoints, metadata, request, 
requestTimeout);
         } catch (Throwable t) {
-            final 
SettableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future0 = 
SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            future = Futures.immediateFailedFuture(t);
         }
         final MessageId messageId = messageView.getMessageId();
         Futures.addCallback(future, new 
FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
@@ -254,14 +250,14 @@ abstract class ConsumerImpl extends ClientImpl {
         return expressionBuilder.build();
     }
 
-    public ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
+    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
         FilterExpression filterExpression) {
         return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
             
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
             .setBatchSize(batchSize).setAutoRenew(true).build();
     }
 
-    public ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
+    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
         FilterExpression filterExpression, Duration invisibleDuration) {
         final com.google.protobuf.Duration duration = 
Durations.fromNanos(invisibleDuration.toNanos());
         return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index ac9814f..12f6f61 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -133,7 +133,7 @@ class ProcessQueueImpl implements ProcessQueue {
 
     @Override
     public boolean expired() {
-        final PushConsumerSettings settings = 
consumer.getPushConsumerSettings();
+        final PushSubscriptionSettings settings = 
consumer.getPushConsumerSettings();
         Duration maxIdleDuration = Duration.ofNanos(2 * 
(settings.getLongPollingTimeout().toNanos()
             + 
consumer.getClientConfiguration().getRequestTimeout().toNanos()));
         final Duration idleDuration = Duration.ofNanos(System.nanoTime() - 
activityNanoTime);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index d4f329c..a1e4332 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -61,7 +61,7 @@ import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
+import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
@@ -92,7 +92,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
     final AtomicLong consumptionErrorQuantity;
 
     private final ClientConfiguration clientConfiguration;
-    private final PushConsumerSettings pushConsumerSettings;
+    private final PushSubscriptionSettings pushSubscriptionSettings;
     private final String consumerGroup;
     private final Map<String /* topic */, FilterExpression> 
subscriptionExpressions;
     private final ConcurrentMap<String /* topic */, Assignments> 
cacheAssignments;
@@ -125,7 +125,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
         super(clientConfiguration, consumerGroup, 
subscriptionExpressions.keySet());
         this.clientConfiguration = clientConfiguration;
         Resource groupResource = new Resource(consumerGroup);
-        this.pushConsumerSettings = new PushConsumerSettings(clientId, 
endpoints, groupResource,
+        this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, 
endpoints, groupResource,
             clientConfiguration.getRequestTimeout(), subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.subscriptionExpressions = subscriptionExpressions;
@@ -155,7 +155,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
             LOGGER.info("Begin to start the rocketmq push consumer, 
clientId={}", clientId);
             super.startUp();
             clientMeterProvider.setMessageCacheObserver(this);
-            final ScheduledExecutorService scheduler = 
clientManager.getScheduler();
+            final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
             this.consumeService = createConsumeService();
             this.consumeService.startAsync().awaitRunning();
             // Scan assignments periodically.
@@ -188,8 +188,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
     }
 
     private ConsumeService createConsumeService() {
-        final ScheduledExecutorService scheduler = 
clientManager.getScheduler();
-        if (pushConsumerSettings.isFifo()) {
+        final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
+        if (pushSubscriptionSettings.isFifo()) {
             return new FifoConsumeService(clientId, processQueueTable, 
messageListener,
                 consumptionExecutor, this, scheduler);
         }
@@ -205,8 +205,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
         return consumerGroup;
     }
 
-    public PushConsumerSettings getPushConsumerSettings() {
-        return pushConsumerSettings;
+    public PushSubscriptionSettings getPushConsumerSettings() {
+        return pushSubscriptionSettings;
     }
 
     /**
@@ -270,7 +270,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
                 final Metadata metadata = sign();
                 final QueryAssignmentRequest request = 
wrapQueryAssignmentRequest(topic);
                 final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
-                return clientManager.queryAssignment(endpoints, metadata, 
request, requestTimeout);
+                return this.getClientManager().queryAssignment(endpoints, 
metadata, request, requestTimeout);
             }, MoreExecutors.directExecutor());
         return Futures.transformAsync(responseFuture, invocation -> {
             final QueryAssignmentResponse response = invocation.getResponse();
@@ -415,8 +415,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
     }
 
     @Override
-    public ClientSettings getClientSettings() {
-        return pushConsumerSettings;
+    public Settings getSettings() {
+        return pushSubscriptionSettings;
     }
 
     /**
@@ -519,7 +519,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
             final ForwardMessageToDeadLetterQueueRequest request =
                 wrapForwardMessageToDeadLetterQueueRequest(messageView);
             final Metadata metadata = sign();
-            future = clientManager.forwardMessageToDeadLetterQueue(endpoints, 
metadata, request,
+            future = 
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, metadata, 
request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
             future = Futures.immediateFailedFuture(t);
@@ -560,7 +560,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
     }
 
     public RetryPolicy getRetryPolicy() {
-        return pushConsumerSettings.getRetryPolicy();
+        return pushSubscriptionSettings.getRetryPolicy();
     }
 
     public ThreadPoolExecutor getConsumptionExecutor() {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
similarity index 83%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 05bfb2f..5784517 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -19,11 +19,9 @@ package org.apache.rocketmq.client.java.impl.consumer;
 
 import apache.rocketmq.v2.FilterType;
 import apache.rocketmq.v2.RetryPolicy;
-import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -31,8 +29,8 @@ import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.impl.UserAgent;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
@@ -41,8 +39,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PushConsumerSettings extends ClientSettings {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PushConsumerSettings.class);
+public class PushSubscriptionSettings extends Settings {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PushSubscriptionSettings.class);
 
     private final Resource group;
     private final Map<String, FilterExpression> subscriptionExpressions;
@@ -50,9 +48,9 @@ public class PushConsumerSettings extends ClientSettings {
     private volatile int receiveBatchSize = 32;
     private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
 
-    public PushConsumerSettings(String clientId, Endpoints accessPoint, 
Resource group, Duration requestTimeout,
-        Map<String, FilterExpression> subscriptionExpression) {
-        super(clientId, ClientType.PUSH_CONSUMER, accessPoint, requestTimeout);
+    public PushSubscriptionSettings(String clientId, Endpoints endpoints, 
Resource group,
+        Duration requestTimeout, Map<String, FilterExpression> 
subscriptionExpression) {
+        super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
         this.group = group;
         this.subscriptionExpressions = subscriptionExpression;
     }
@@ -70,7 +68,7 @@ public class PushConsumerSettings extends ClientSettings {
     }
 
     @Override
-    public Settings toProtobuf() {
+    public apache.rocketmq.v2.Settings toProtobuf() {
         List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
         for (Map.Entry<String, FilterExpression> entry : 
subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
@@ -95,15 +93,15 @@ public class PushConsumerSettings extends ClientSettings {
         }
         Subscription subscription =
             
Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
-        return 
Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
-            
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
-            .setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+        return 
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
+            
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
+            
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
 
     @Override
-    public void applySettingsCommand(Settings settings) {
-        final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
-        if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
+    public void sync(apache.rocketmq.v2.Settings settings) {
+        final apache.rocketmq.v2.Settings.PubSubCase pubSubCase = 
settings.getPubSubCase();
+        if 
(!apache.rocketmq.v2.Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
             LOGGER.error("[Bug] Issued settings not match with the client 
type, clientId={}, pubSubCase={}, "
                 + "clientType={}", clientId, pubSubCase, clientType);
             return;
@@ -123,7 +121,6 @@ public class PushConsumerSettings extends ClientSettings {
             default:
                 throw new IllegalArgumentException("Unrecognized backoff 
policy strategy.");
         }
-        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index e5f9618..bb1500d 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -43,7 +43,7 @@ import 
org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
+import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
 class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleConsumerImpl.class);
 
-    private final SimpleConsumerSettings simpleConsumerSettings;
+    private final SimpleSubscriptionSettings simpleSubscriptionSettings;
     private final String consumerGroup;
     private final Duration awaitDuration;
 
@@ -72,8 +72,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
         Map<String, FilterExpression> subscriptionExpressions) {
         super(clientConfiguration, consumerGroup, 
subscriptionExpressions.keySet());
         Resource groupResource = new Resource(consumerGroup);
-        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, 
endpoints, groupResource,
-            clientConfiguration.getRequestTimeout(), awaitDuration, 
subscriptionExpressions);
+        this.simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientId, endpoints,
+            groupResource, clientConfiguration.getRequestTimeout(), 
awaitDuration, subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.awaitDuration = awaitDuration;
 
@@ -294,8 +294,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
     }
 
     @Override
-    public ClientSettings getClientSettings() {
-        return simpleConsumerSettings;
+    public Settings getSettings() {
+        return simpleSubscriptionSettings;
     }
 
     public void onTopicRouteDataUpdate0(String topic, TopicRouteData 
topicRouteData) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
similarity index 76%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 93816c8..0d4b9f4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -18,11 +18,9 @@
 package org.apache.rocketmq.client.java.impl.consumer;
 
 import apache.rocketmq.v2.FilterType;
-import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -30,36 +28,36 @@ import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.impl.UserAgent;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SimpleConsumerSettings extends ClientSettings {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleConsumerSettings.class);
+public class SimpleSubscriptionSettings extends Settings {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleSubscriptionSettings.class);
 
     private final Resource group;
     private final Duration longPollingTimeout;
     private final Map<String, FilterExpression> subscriptionExpressions;
 
-    public SimpleConsumerSettings(String clientId, Endpoints accessPoint, 
Resource group, Duration requestTimeout,
-        Duration longPollingTimeout, Map<String, FilterExpression> 
subscriptionExpression) {
-        super(clientId, ClientType.SIMPLE_CONSUMER, accessPoint, 
requestTimeout);
+    public SimpleSubscriptionSettings(String clientId, Endpoints endpoints, 
Resource group,
+        Duration requestTimeout, Duration longPollingTimeout, Map<String, 
FilterExpression> subscriptionExpression) {
+        super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
         this.group = group;
         this.subscriptionExpressions = subscriptionExpression;
         this.longPollingTimeout = longPollingTimeout;
     }
 
     @Override
-    public Settings toProtobuf() {
+    public apache.rocketmq.v2.Settings toProtobuf() {
         List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
         for (Map.Entry<String, FilterExpression> entry : 
subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
-            apache.rocketmq.v2.Resource topic =
-                
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+            apache.rocketmq.v2.Resource topic = 
apache.rocketmq.v2.Resource.newBuilder()
+                .setName(entry.getKey()).build();
             final apache.rocketmq.v2.FilterExpression.Builder 
expressionBuilder =
                 
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = 
filterExpression.getFilterExpressionType();
@@ -80,20 +78,18 @@ public class SimpleConsumerSettings extends ClientSettings {
         Subscription subscription = 
Subscription.newBuilder().setGroup(group.toProtobuf())
             
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
             .addAllSubscriptions(subscriptionEntries).build();
-        return 
Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
-            
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
-            .setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+        return 
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
+            
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
+            
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
 
     @Override
-    public void applySettingsCommand(Settings settings) {
-        final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
-        if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
+    public void sync(apache.rocketmq.v2.Settings settings) {
+        final apache.rocketmq.v2.Settings.PubSubCase pubSubCase = 
settings.getPubSubCase();
+        if 
(!apache.rocketmq.v2.Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
             LOGGER.error("[Bug] Issued settings not match with the client 
type, clientId={}, pubSubCase={}, "
                 + "clientType={}", clientId, pubSubCase, clientType);
-            return;
         }
-        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 231de9c..cf7743c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -22,7 +22,6 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
-import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.stub.StreamObserver;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
@@ -47,11 +46,6 @@ public interface ClientSessionHandler {
      */
     boolean isEndpointsDeprecated(Endpoints endpoints);
 
-    /**
-     * Await the settings to be synchronized with the server.
-     */
-    ListenableFuture<Void> awaitSettingSynchronized();
-
     /**
      * Indicates the client identifier.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 59a2bd9..39c9391 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -64,7 +64,7 @@ import 
org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
+import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageType;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
 class ProducerImpl extends ClientImpl implements Producer {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProducerImpl.class);
 
-    protected final ProducerSettings producerSettings;
+    protected final PublishingSettings publishingSettings;
     final ConcurrentMap<String/* topic */, PublishingLoadBalancer> 
publishingRouteDataCache;
     private final TransactionChecker checker;
 
@@ -99,7 +99,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         TransactionChecker checker) {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = 
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
-        this.producerSettings = new ProducerSettings(clientId, endpoints, 
retryPolicy,
+        this.publishingSettings = new PublishingSettings(clientId, endpoints, 
retryPolicy,
             clientConfiguration.getRequestTimeout(), topics);
         this.checker = checker;
         this.publishingRouteDataCache = new ConcurrentHashMap<>();
@@ -177,8 +177,8 @@ class ProducerImpl extends ClientImpl implements Producer {
     }
 
     @Override
-    public ClientSettings getClientSettings() {
-        return producerSettings;
+    public Settings getSettings() {
+        return publishingSettings;
     }
 
     @Override
@@ -216,8 +216,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         } catch (Throwable t) {
             throw new ClientException(t);
         }
-        final ListenableFuture<List<SendReceiptImpl>> future = 
send(Collections.singletonList(publishingMessage),
-            true);
+        final ListenableFuture<List<SendReceiptImpl>> future = 
send(Collections.singletonList(publishingMessage), true);
         final List<SendReceiptImpl> receipts = handleClientFuture(future);
         final SendReceiptImpl sendReceipt = receipts.iterator().next();
         ((TransactionImpl) transaction).tryAddReceipt(publishingMessage, 
sendReceipt);
@@ -282,7 +281,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         doBefore(messageHookPoints, messageCommons);
 
         final ListenableFuture<RpcInvocation<EndTransactionResponse>> future =
-            clientManager.endTransaction(endpoints, metadata, request, 
requestTimeout);
+            this.getClientManager().endTransaction(endpoints, metadata, 
request, requestTimeout);
         Futures.addCallback(future, new 
FutureCallback<RpcInvocation<EndTransactionResponse>>() {
             @Override
             public void onSuccess(RpcInvocation<EndTransactionResponse> 
invocation) {
@@ -318,7 +317,7 @@ class ProducerImpl extends ClientImpl implements Producer {
     }
 
     private RetryPolicy getRetryPolicy() {
-        return producerSettings.getRetryPolicy();
+        return publishingSettings.getRetryPolicy();
     }
 
     /**
@@ -343,7 +342,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         List<PublishingMessageImpl> pubMessages = new ArrayList<>();
         for (Message message : messages) {
             try {
-                final PublishingMessageImpl pubMessage = new 
PublishingMessageImpl(message, producerSettings,
+                final PublishingMessageImpl pubMessage = new 
PublishingMessageImpl(message, publishingSettings,
                     txEnabled);
                 pubMessages.add(pubMessage);
             } catch (Throwable t) {
@@ -428,7 +427,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
         final SendMessageRequest request = wrapSendMessageRequest(pubMessages, 
mq);
         final ListenableFuture<RpcInvocation<SendMessageResponse>> future0 =
-            clientManager.sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
+            this.getClientManager().sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
         return Futures.transformAsync(future0,
             invocation -> 
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, 
invocation)),
             MoreExecutors.directExecutor());
@@ -447,7 +446,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         // Calculate the current message queue.
         final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, 
candidates.size()));
         final List<MessageType> acceptMessageTypes = 
mq.getAcceptMessageTypes();
-        if (producerSettings.isValidateMessageType() && 
!acceptMessageTypes.contains(messageType)) {
+        if (publishingSettings.isValidateMessageType() && 
!acceptMessageTypes.contains(messageType)) {
             final IllegalArgumentException e = new 
IllegalArgumentException("Current message type not match with "
                 + "topic accept message types, topic=" + topic + ", 
actualMessageType=" + messageType + ", "
                 + "acceptMessageTypes=" + acceptMessageTypes);
@@ -536,8 +535,8 @@ class ProducerImpl extends ClientImpl implements Producer {
                 LOGGER.warn("Failed to send message due to too many requests, 
would attempt to resend after {}, "
                         + "maxAttempts={}, attempt={}, topic={}, 
messageId(s)={}, endpoints={}, clientId={}", delay,
                     maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
-                clientManager.getScheduler().schedule(() -> send0(future0, 
topic, messageType, candidates, messages,
-                    nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
+                
ProducerImpl.this.getClientManager().getScheduler().schedule(() -> 
send0(future0, topic, messageType,
+                    candidates, messages, nextAttempt), delay.toNanos(), 
TimeUnit.NANOSECONDS);
             }
         }, clientCallbackExecutor);
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
similarity index 84%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index 4919927..2649ea9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -19,15 +19,13 @@ package org.apache.rocketmq.client.java.impl.producer;
 
 import apache.rocketmq.v2.Publishing;
 import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.impl.UserAgent;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
@@ -35,8 +33,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ProducerSettings extends ClientSettings {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProducerSettings.class);
+public class PublishingSettings extends Settings {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PublishingSettings.class);
 
     private final Set<String> topics;
     /**
@@ -45,7 +43,7 @@ public class ProducerSettings extends ClientSettings {
     private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
     private volatile boolean validateMessageType = true;
 
-    public ProducerSettings(String clientId, Endpoints accessPoint, 
ExponentialBackoffRetryPolicy retryPolicy,
+    public PublishingSettings(String clientId, Endpoints accessPoint, 
ExponentialBackoffRetryPolicy retryPolicy,
         Duration requestTimeout, Set<String> topics) {
         super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, 
requestTimeout);
         this.topics = topics;
@@ -60,20 +58,20 @@ public class ProducerSettings extends ClientSettings {
     }
 
     @Override
-    public Settings toProtobuf() {
+    public apache.rocketmq.v2.Settings toProtobuf() {
         final Publishing publishing = Publishing.newBuilder()
             .addAllTopics(topics.stream().map(name -> 
Resource.newBuilder().setName(name).build())
                 
.collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
-        final Settings.Builder builder = Settings.newBuilder()
+        final apache.rocketmq.v2.Settings.Builder builder = 
apache.rocketmq.v2.Settings.newBuilder()
             
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
         return 
builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
 
     @Override
-    public void applySettingsCommand(Settings settings) {
-        final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
-        if (!Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
+    public void sync(apache.rocketmq.v2.Settings settings) {
+        final apache.rocketmq.v2.Settings.PubSubCase pubSubCase = 
settings.getPubSubCase();
+        if 
(!apache.rocketmq.v2.Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
             LOGGER.error("[Bug] Issued settings not match with the client 
type, clientId={}, pubSubCase={}, "
                 + "clientType={}", clientId, pubSubCase, clientType);
             return;
@@ -84,7 +82,6 @@ public class ProducerSettings extends ClientSettings {
         this.retryPolicy = exist.inheritBackoff(backoffPolicy);
         this.validateMessageType = 
settings.getPublishing().getValidateMessageType();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
-        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index aef1098..986e8b4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -31,11 +31,8 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 class TransactionImpl implements Transaction {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProducerImpl.class);
     private static final int MAX_MESSAGE_NUM = 1;
 
     private final ProducerImpl producerImpl;
@@ -54,7 +51,7 @@ class TransactionImpl implements Transaction {
     public PublishingMessageImpl tryAddMessage(Message message) throws 
IOException {
         messagesLock.readLock().lock();
         try {
-            if (messages.size() > MAX_MESSAGE_NUM) {
+            if (messages.size() >= MAX_MESSAGE_NUM) {
                 throw new IllegalArgumentException("Message in transaction has 
exceeded the threshold: " +
                     MAX_MESSAGE_NUM);
             }
@@ -68,7 +65,7 @@ class TransactionImpl implements Transaction {
                     MAX_MESSAGE_NUM);
             }
             final PublishingMessageImpl publishingMessage = new 
PublishingMessageImpl(message,
-                producerImpl.producerSettings, true);
+                producerImpl.publishingSettings, true);
             messages.add(publishingMessage);
             return publishingMessage;
         } finally {
@@ -80,8 +77,7 @@ class TransactionImpl implements Transaction {
         messagesLock.readLock().lock();
         try {
             if (!messages.contains(publishingMessage)) {
-                LOGGER.warn("message(s) is not contained in current 
transaction");
-                return;
+                throw new IllegalArgumentException("Message not in 
transaction");
             }
             messageSendReceiptMap.put(publishingMessage, sendReceipt);
         } finally {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 5edfab1..306ff5c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 import java.util.Optional;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
-import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
+import org.apache.rocketmq.client.java.impl.producer.PublishingSettings;
 import org.apache.rocketmq.client.java.message.protocol.Encoding;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
@@ -39,12 +39,12 @@ public class PublishingMessageImpl extends MessageImpl {
     private final MessageType messageType;
     private volatile String traceContext;
 
-    public PublishingMessageImpl(Message message, ProducerSettings 
producerSettings, boolean txEnabled)
+    public PublishingMessageImpl(Message message, PublishingSettings 
publishingSettings, boolean txEnabled)
         throws IOException {
         super(message);
         this.traceContext = null;
         final int length = message.getBody().remaining();
-        final int maxBodySizeBytes = producerSettings.getMaxBodySizeBytes();
+        final int maxBodySizeBytes = publishingSettings.getMaxBodySizeBytes();
         if (length > maxBodySizeBytes) {
             throw new IOException("Message body size exceeds the threshold, 
max size=" + maxBodySizeBytes + " bytes");
         }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
index 6b76d78..d0297eb 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
@@ -22,8 +22,8 @@ import com.google.common.base.Objects;
 import org.apache.commons.lang3.StringUtils;
 
 public class Resource {
-    private String namespace;
-    private String name;
+    private final String namespace;
+    private final String name;
 
     public Resource(String namespace, String name) {
         this.namespace = namespace;
@@ -52,14 +52,6 @@ public class Resource {
         return this.name;
     }
 
-    public void setNamespace(String namespace) {
-        this.namespace = namespace;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index d036c14..ef8803f 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -56,9 +56,9 @@ public class Utilities {
 
     private static final String HOST_NAME_NOT_FOUND = "HOST_NAME_NOT_FOUND";
 
-    private static String protocolVersion = null;
-    private static String hostName = null;
-    private static byte[] macAddress = null;
+    private static final ThreadLocal<String> PROTOCOL_VERSION_THREAD_LOCAL = 
new ThreadLocal<>();
+    private static final ThreadLocal<String> HOST_NAME_THREAD_LOCAL = new 
ThreadLocal<>();
+    private static final ThreadLocal<byte[]> MAC_ADDRESS_THREAD_LOCAL = new 
ThreadLocal<>();
 
     /**
      * Used to build output as Hex
@@ -78,6 +78,7 @@ public class Utilities {
     }
 
     public static byte[] macAddress() {
+        byte[] macAddress = MAC_ADDRESS_THREAD_LOCAL.get();
         if (null != macAddress) {
             return macAddress.clone();
         }
@@ -90,6 +91,7 @@ public class Utilities {
                     continue;
                 }
                 macAddress = mac;
+                MAC_ADDRESS_THREAD_LOCAL.set(macAddress);
                 return macAddress.clone();
             }
         } catch (Throwable ignore) {
@@ -98,14 +100,17 @@ public class Utilities {
         byte[] randomBytes = new byte[6];
         RANDOM.nextBytes(randomBytes);
         macAddress = randomBytes;
+        MAC_ADDRESS_THREAD_LOCAL.set(macAddress);
         return macAddress.clone();
     }
 
     public static String getProtocolVersion() {
+        String protocolVersion = PROTOCOL_VERSION_THREAD_LOCAL.get();
         if (null != protocolVersion) {
             return protocolVersion;
         }
         protocolVersion = 
ReceiveMessageRequest.class.getName().split("\\.")[2];
+        PROTOCOL_VERSION_THREAD_LOCAL.set(protocolVersion);
         return protocolVersion;
     }
 
@@ -125,14 +130,17 @@ public class Utilities {
     }
 
     public static String hostName() {
+        String hostName = HOST_NAME_THREAD_LOCAL.get();
         if (null != hostName) {
             return hostName;
         }
         try {
             hostName = InetAddress.getLocalHost().getHostName();
+            HOST_NAME_THREAD_LOCAL.set(hostName);
             return hostName;
         } catch (Throwable ignore) {
             hostName = HOST_NAME_NOT_FOUND;
+            HOST_NAME_THREAD_LOCAL.set(hostName);
             return hostName;
         }
     }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index 6a240cc..5f84699 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -33,12 +33,15 @@ import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class ClientManagerImplTest extends TestBase {
-    private static final ClientManagerImpl CLIENT_MANAGER = new 
ClientManagerImpl(null);
+    private static final Client CLIENT = Mockito.mock(Client.class);
+    private static final ClientManagerImpl CLIENT_MANAGER = new 
ClientManagerImpl(CLIENT);
 
     @BeforeClass
     public static void setUp() {
+        Mockito.when(CLIENT.clientId()).thenReturn("clientId");
         CLIENT_MANAGER.startAsync().awaitRunning();
     }
 
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
new file mode 100644
index 0000000..8dd7be5
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConsumeServiceTest extends TestBase {
+    private final String clientId = "clientId";
+    private final ConcurrentMap<MessageQueueImpl, ProcessQueue> table = new 
ConcurrentHashMap<>();
+    private final MessageInterceptor interceptor = 
Mockito.mock(MessageInterceptor.class);
+    private final ThreadPoolExecutor consumptionExecutor = new 
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(), new 
ThreadFactoryImpl("TestMessageConsumption"));
+    private final ScheduledExecutorService scheduler = new 
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+        "TestScheduler"));
+
+
+    @Test
+    public void testConsumeSuccess() throws ExecutionException, 
InterruptedException, TimeoutException {
+        final MessageListener messageListener = messageView -> 
ConsumeResult.SUCCESS;
+        final ConsumeService consumeService = new ConsumeService(clientId, 
table, messageListener,
+            consumptionExecutor, interceptor, scheduler) {
+            @Override
+            public void dispatch() {
+            }
+        };
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final ListenableFuture<ConsumeResult> future = 
consumeService.consume(messageView);
+        final ConsumeResult consumeResult = future.get(1000, 
TimeUnit.MILLISECONDS);
+        assertEquals(ConsumeResult.SUCCESS, consumeResult);
+    }
+
+    @Test
+    public void testConsumeFailure() throws ExecutionException, 
InterruptedException, TimeoutException {
+        final MessageListener messageListener = messageView -> 
ConsumeResult.FAILURE;
+        final ConsumeService consumeService = new ConsumeService(clientId, 
table, messageListener,
+            consumptionExecutor, interceptor, scheduler) {
+            @Override
+            public void dispatch() {
+            }
+        };
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final ListenableFuture<ConsumeResult> future = 
consumeService.consume(messageView);
+        final ConsumeResult consumeResult = future.get(1000, 
TimeUnit.MILLISECONDS);
+        assertEquals(ConsumeResult.FAILURE, consumeResult);
+    }
+
+    @Test
+    public void testConsumeWithException() throws ExecutionException, 
InterruptedException, TimeoutException {
+        final MessageListener messageListener = messageView -> {
+            throw new RuntimeException();
+        };
+        final ConsumeService consumeService = new ConsumeService(clientId, 
table, messageListener,
+            consumptionExecutor, interceptor, scheduler) {
+            @Override
+            public void dispatch() {
+            }
+        };
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final ListenableFuture<ConsumeResult> future = 
consumeService.consume(messageView);
+        final ConsumeResult consumeResult = future.get(1000, 
TimeUnit.MILLISECONDS);
+        assertEquals(ConsumeResult.FAILURE, consumeResult);
+    }
+
+    @Test
+    public void testConsumeWithDelay() throws ExecutionException, 
InterruptedException {
+        final MessageListener messageListener = messageView -> 
ConsumeResult.SUCCESS;
+        final ConsumeService consumeService = new ConsumeService(clientId, 
table, messageListener,
+            consumptionExecutor, interceptor, scheduler) {
+            @Override
+            public void dispatch() {
+            }
+        };
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final ListenableFuture<ConsumeResult> future = 
consumeService.consume(messageView, Duration.ofMillis(500));
+        final ConsumeResult consumeResult = future.get();
+        assertEquals(ConsumeResult.SUCCESS, consumeResult);
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
new file mode 100644
index 0000000..3cc63f0
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConsumeTaskTest extends TestBase {
+
+    @Test
+    public void testCallWithConsumeSuccess() {
+        String clientId = "foobar";
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final MessageListener messageListener = 
Mockito.mock(MessageListener.class);
+        
Mockito.when(messageListener.consume(messageView)).thenReturn(ConsumeResult.SUCCESS);
+        final MessageInterceptor messageInterceptor = 
Mockito.mock(MessageInterceptor.class);
+        final ConsumeTask consumeTask = new ConsumeTask(clientId, 
messageListener, messageView, messageInterceptor);
+        final ConsumeResult consumeResult = consumeTask.call();
+        assertEquals(ConsumeResult.SUCCESS, consumeResult);
+    }
+
+    @Test
+    public void testCallWithConsumeException() {
+        String clientId = "foobar";
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final MessageListener messageListener = 
Mockito.mock(MessageListener.class);
+        Mockito.when(messageListener.consume(messageView)).thenThrow(new 
RuntimeException());
+        final MessageInterceptor messageInterceptor = 
Mockito.mock(MessageInterceptor.class);
+        final ConsumeTask consumeTask = new ConsumeTask(clientId, 
messageListener, messageView, messageInterceptor);
+        final ConsumeResult consumeResult = consumeTask.call();
+        assertEquals(ConsumeResult.FAILURE, consumeResult);
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
new file mode 100644
index 0000000..b115fb2
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+
+import apache.rocketmq.v2.AckMessageRequest;
+import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
+import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
+import apache.rocketmq.v2.ReceiveMessageRequest;
+import apache.rocketmq.v2.ReceiveMessageResponse;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.Metadata;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.impl.ClientManager;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerImplTest extends TestBase {
+    private final Map<String, FilterExpression> subscriptionExpressions = 
createSubscriptionExpressions(FAKE_TOPIC_0);
+    private final MessageListener messageListener = messageView -> 
ConsumeResult.SUCCESS;
+    private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ENDPOINTS).build();
+
+    @Test
+    public void testReceiveMessage() throws ExecutionException, 
InterruptedException {
+        int maxCacheMessageCount = 8;
+        int maxCacheMessageSizeInBytes = 1024;
+        int consumptionThreadCount = 4;
+        PushConsumerImpl pushConsumer = Mockito.spy(new 
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
+            subscriptionExpressions, messageListener, maxCacheMessageCount, 
maxCacheMessageSizeInBytes,
+            consumptionThreadCount));
+        final ClientManager clientManager = Mockito.mock(ClientManager.class);
+        Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
+        int receivedMessageCount = 1;
+        final 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
+            okReceiveMessageResponsesFuture(FAKE_TOPIC_0, 
receivedMessageCount);
+        
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
 any(Metadata.class),
+            any(ReceiveMessageRequest.class), any(Duration.class));
+        final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
+        final ReceiveMessageRequest request = 
pushConsumer.wrapReceiveMessageRequest(1,
+            mq, new FilterExpression(), Duration.ofSeconds(15));
+        final ListenableFuture<ReceiveMessageResult> future0 =
+            pushConsumer.receiveMessage(request, mq, Duration.ofSeconds(15));
+        final ReceiveMessageResult receiveMessageResult = future0.get();
+        Assert.assertEquals(receiveMessageResult.getMessageViews().size(), 
receivedMessageCount);
+    }
+
+    @Test
+    public void testAckMessage() throws ExecutionException, 
InterruptedException {
+        int maxCacheMessageCount = 8;
+        int maxCacheMessageSizeInBytes = 1024;
+        int consumptionThreadCount = 4;
+        PushConsumerImpl pushConsumer = Mockito.spy(new 
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
+            subscriptionExpressions, messageListener, maxCacheMessageCount, 
maxCacheMessageSizeInBytes,
+            consumptionThreadCount));
+        final ClientManager clientManager = Mockito.mock(ClientManager.class);
+        Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
+        final ListenableFuture<RpcInvocation<AckMessageResponse>> future =
+            okAckMessageResponseFuture();
+        
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class), 
any(Metadata.class),
+            any(AckMessageRequest.class), any(Duration.class));
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final ListenableFuture<RpcInvocation<AckMessageResponse>> future0 =
+            pushConsumer.ackMessage(messageView);
+        final RpcInvocation<AckMessageResponse> rpcInvocation = future0.get();
+        Assert.assertEquals(rpcInvocation, future.get());
+    }
+
+    @Test
+    public void testChangeInvisibleDuration() throws ExecutionException, 
InterruptedException {
+        int maxCacheMessageCount = 8;
+        int maxCacheMessageSizeInBytes = 1024;
+        int consumptionThreadCount = 4;
+        PushConsumerImpl pushConsumer = Mockito.spy(new 
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
+            subscriptionExpressions, messageListener, maxCacheMessageCount, 
maxCacheMessageSizeInBytes,
+            consumptionThreadCount));
+        final ClientManager clientManager = Mockito.mock(ClientManager.class);
+        Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
+        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future =
+            okChangeInvisibleDurationCtxFuture();
+        
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
 any(Metadata.class),
+            any(ChangeInvisibleDurationRequest.class), any(Duration.class));
+        final MessageViewImpl messageView = fakeMessageViewImpl();
+        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future0 =
+            pushConsumer.changeInvisibleDuration(messageView, 
Duration.ofSeconds(15));
+        final RpcInvocation<ChangeInvisibleDurationResponse> rpcInvocation = 
future0.get();
+        Assert.assertEquals(rpcInvocation, future.get());
+    }
+
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeServiceTest.java
similarity index 68%
copy from 
java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
copy to 
java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeServiceTest.java
index e1298bd..19eb29b 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeServiceTest.java
@@ -24,8 +24,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -37,14 +37,12 @@ import 
org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 
-public class StandardConsumeServiceTest extends TestBase {
+public class FifoConsumeServiceTest extends TestBase {
 
     @Test
-    @Ignore
-    public void testDispatch0() {
+    public void testDispatch() throws InterruptedException {
         final ProcessQueue processQueue0 = mock(ProcessQueue.class);
         final ProcessQueue processQueue1 = mock(ProcessQueue.class);
 
@@ -55,14 +53,20 @@ public class StandardConsumeServiceTest extends TestBase {
         processQueueTable.put(messageQueue0, processQueue0);
         processQueueTable.put(messageQueue1, processQueue1);
 
-        final MessageViewImpl messageView0 = 
fakeMessageViewImpl(messageQueue0);
-        final MessageViewImpl messageView1 = 
fakeMessageViewImpl(messageQueue1);
+        final MessageViewImpl messageView00 = 
fakeMessageViewImpl(messageQueue0);
+        final MessageViewImpl messageView01 = 
fakeMessageViewImpl(messageQueue0);
+        List<MessageViewImpl> messageViewList0 = new ArrayList<>();
+        messageViewList0.add(messageView00);
+        messageViewList0.add(messageView01);
 
-        
when(processQueue0.tryTakeMessage()).thenReturn(Optional.of(messageView0));
-        
when(processQueue1.tryTakeMessage()).thenReturn(Optional.of(messageView1));
+        final MessageViewImpl messageView10 = 
fakeMessageViewImpl(messageQueue1);
+        List<MessageViewImpl> messageViewList1 = new ArrayList<>();
+        messageViewList1.add(messageView10);
 
-        MessageListener listener = messageView -> ConsumeResult.SUCCESS;
+        
when(processQueue0.tryTakeFifoMessages()).thenReturn(messageViewList0.iterator());
+        
when(processQueue1.tryTakeFifoMessages()).thenReturn(messageViewList1.iterator());
 
+        MessageListener listener = messageView -> ConsumeResult.SUCCESS;
         MessageInterceptor interceptor = new MessageInterceptor() {
             @Override
             public void doBefore(MessageHookPoints messageHookPoints, 
List<MessageCommon> messageCommons) {
@@ -73,10 +77,14 @@ public class StandardConsumeServiceTest extends TestBase {
                 Duration duration, MessageHookPointsStatus status) {
             }
         };
-        final StandardConsumeService service = new 
StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
+        final FifoConsumeService service = new 
FifoConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
             SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
-        service.dispatch0();
-        verify(processQueue0, 
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
-        verify(processQueue0, 
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
+        service.dispatch();
+        Thread.sleep(1000);
+        verify(processQueue0, times(1)).tryTakeFifoMessages();
+        verify(processQueue1, times(1)).tryTakeFifoMessages();
+        verify(processQueue0, 
times(2)).eraseFifoMessage(any(MessageViewImpl.class), 
any(ConsumeResult.class));
+        verify(processQueue1, 
times(1)).eraseFifoMessage(any(MessageViewImpl.class), 
any(ConsumeResult.class));
+
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 7af67d5..bc92717 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -67,7 +67,7 @@ public class ProcessQueueImplTest extends TestBase {
     @Mock
     private ConsumeService consumeService;
     @Mock
-    private PushConsumerSettings pushConsumerSettings;
+    private PushSubscriptionSettings pushSubscriptionSettings;
     @Mock
     private RetryPolicy retryPolicy;
 
@@ -94,7 +94,7 @@ public class ProcessQueueImplTest extends TestBase {
         field1.setAccessible(true);
         field1.set(pushConsumer, consumptionErrorQuantity);
 
-        
when(pushConsumer.getPushConsumerSettings()).thenReturn(pushConsumerSettings);
+        
when(pushConsumer.getPushConsumerSettings()).thenReturn(pushSubscriptionSettings);
         when(pushConsumer.getScheduler()).thenReturn(SCHEDULER);
 
         this.receivedMessagesQuantity = new AtomicLong(0);
@@ -104,9 +104,9 @@ public class ProcessQueueImplTest extends TestBase {
 
     @Test
     public void testExpired() {
-        
when(pushConsumerSettings.getLongPollingTimeout()).thenReturn(Duration.ofSeconds(3));
+        
when(pushSubscriptionSettings.getLongPollingTimeout()).thenReturn(Duration.ofSeconds(3));
         
when(pushConsumer.getClientConfiguration()).thenReturn(ClientConfiguration.newBuilder()
-            .setEndpoints(FAKE_ACCESS_POINT).build());
+            .setEndpoints(FAKE_ENDPOINTS).build());
         assertFalse(processQueue.expired());
     }
 
@@ -124,7 +124,7 @@ public class ProcessQueueImplTest extends TestBase {
         messageViewList.add(corruptedMessageView0);
         
when(retryPolicy.getNextAttemptDelay(anyInt())).thenReturn(Duration.ofSeconds(1));
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
-        when(pushConsumerSettings.isFifo()).thenReturn(false);
+        when(pushSubscriptionSettings.isFifo()).thenReturn(false);
         when(pushConsumer.changeInvisibleDuration(any(MessageViewImpl.class), 
any(Duration.class)))
             .thenReturn(okChangeInvisibleDurationCtxFuture());
         processQueue.cacheMessages(messageViewList);
@@ -137,7 +137,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl corruptedMessageView0 = 
fakeMessageViewImpl(true);
         List<MessageViewImpl> messageViewList = new ArrayList<>();
         messageViewList.add(corruptedMessageView0);
-        when(pushConsumerSettings.isFifo()).thenReturn(true);
+        when(pushSubscriptionSettings.isFifo()).thenReturn(true);
         
when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)))
             .thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
         processQueue.cacheMessages(messageViewList);
@@ -167,7 +167,7 @@ public class ProcessQueueImplTest extends TestBase {
         future0.set(receiveMessageResult);
         when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), 
any(MessageQueueImpl.class),
             any(Duration.class))).thenReturn(future0);
-        when(pushConsumerSettings.getReceiveBatchSize()).thenReturn(32);
+        when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
         ReceiveMessageRequest request = 
ReceiveMessageRequest.newBuilder().build();
         when(pushConsumer.wrapReceiveMessageRequest(anyInt(), 
any(MessageQueueImpl.class),
             any(FilterExpression.class))).thenReturn(request);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
index b1251fb..8eaaf52 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
@@ -65,8 +65,8 @@ public class PushConsumerBuilderImplTest extends TestBase {
     public void testBuildWithoutExpressions() throws ClientException {
         final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
         ClientConfiguration clientConfiguration =
-            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
-        
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_CONSUMER_GROUP_0)
             .setMessageListener(messageView -> ConsumeResult.SUCCESS)
             .build();
     }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index ba65c05..673e711 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -52,10 +52,10 @@ public class PushConsumerImplTest extends TestBase {
     private final int consumptionThreadCount = 4;
 
     private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-        .setEndpoints(FAKE_ACCESS_POINT).build();
+        .setEndpoints(FAKE_ENDPOINTS).build();
 
     @Spy
-    private final PushConsumerImpl pushConsumer = new 
PushConsumerImpl(clientConfiguration, FAKE_GROUP_0,
+    private final PushConsumerImpl pushConsumer = new 
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
         subscriptionExpressions, messageListener, maxCacheMessageCount, 
maxCacheMessageSizeInBytes,
         consumptionThreadCount);
 
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
new file mode 100644
index 0000000..90bbdeb
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.CustomizedBackoff;
+import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.Subscription;
+import apache.rocketmq.v2.SubscriptionEntry;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.java.message.protocol.Resource;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PushSubscriptionSettingsTest extends TestBase {
+
+    @Test
+    public void testToProtobuf() {
+        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        String clientId = "clientId";
+        Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+        subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(clientId,
+            fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
+        final Settings settings = pushSubscriptionSettings.toProtobuf();
+        Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
+        Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
+        Assert.assertTrue(settings.hasSubscription());
+        final Subscription subscription = settings.getSubscription();
+        Assert.assertEquals(subscription.getGroup(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+        Assert.assertFalse(subscription.getFifo());
+        final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
+        Assert.assertEquals(subscriptionsList.size(), 1);
+        final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+        Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.TAG);
+        Assert.assertEquals(subscriptionEntry.getTopic(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+    }
+
+    @Test
+    public void testToProtobufWithSqlExpression() {
+        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        String clientId = "clientId";
+        Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+        subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
+            + "b=TRUE)", FilterExpressionType.SQL92));
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(clientId,
+            fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
+        final Settings settings = pushSubscriptionSettings.toProtobuf();
+        Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
+        Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
+        Assert.assertTrue(settings.hasSubscription());
+        final Subscription subscription = settings.getSubscription();
+        Assert.assertEquals(subscription.getGroup(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+        Assert.assertFalse(subscription.getFifo());
+        final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
+        Assert.assertEquals(subscriptionsList.size(), 1);
+        final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+        Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.SQL);
+        Assert.assertEquals(subscriptionEntry.getTopic(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+    }
+
+    @Test
+    public void testSync() {
+        com.google.protobuf.Duration duration0 = Durations.fromSeconds(1);
+        com.google.protobuf.Duration duration1 = Durations.fromSeconds(2);
+        com.google.protobuf.Duration duration2 = Durations.fromSeconds(3);
+        List<com.google.protobuf.Duration> durations = new ArrayList<>();
+        durations.add(duration0);
+        durations.add(duration1);
+        durations.add(duration2);
+        CustomizedBackoff customizedBackoff = 
CustomizedBackoff.newBuilder().addAllNext(durations).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            .setCustomizedBackoff(customizedBackoff).setMaxAttempts(3).build();
+        boolean fifo = true;
+        int receiveBatchSize = 96;
+        com.google.protobuf.Duration longPollingTimeout = 
Durations.fromSeconds(60);
+        Subscription subscription = 
Subscription.newBuilder().setFifo(fifo).setReceiveBatchSize(receiveBatchSize)
+            .setLongPollingTimeout(longPollingTimeout).build();
+        Settings settings = 
Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
+        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        String clientId = "clientId";
+        Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+        subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
+            + "b=TRUE)", FilterExpressionType.SQL92));
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(clientId,
+            fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
+        pushSubscriptionSettings.sync(settings);
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
index f3479e8..3f6002e 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
@@ -40,8 +40,8 @@ public class SimpleConsumerBuilderTest extends TestBase {
     public void testBuildWithoutExpressions() throws ClientException {
         final SimpleConsumerBuilderImpl builder = new 
SimpleConsumerBuilderImpl();
         ClientConfiguration clientConfiguration =
-            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
-        
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_CONSUMER_GROUP_0)
             .build();
     }
 }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 00dfe5a..6fc0709 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -56,7 +56,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 public class SimpleConsumerImplTest extends TestBase {
     @InjectMocks
     private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-        .setEndpoints(FAKE_ACCESS_POINT).build();
+        .setEndpoints(FAKE_ENDPOINTS).build();
 
     private final Duration awaitDuration = Duration.ofSeconds(3);
     private final Map<String, FilterExpression> subExpressions = 
createSubscriptionExpressions(FAKE_TOPIC_0);
@@ -65,31 +65,35 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalStateException.class)
     public void testReceiveWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_CONSUMER_GROUP_0, awaitDuration,
+            subExpressions);
         simpleConsumer.receive(1, Duration.ofSeconds(1));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testAckWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_CONSUMER_GROUP_0, awaitDuration,
+            subExpressions);
         simpleConsumer.ack(fakeMessageViewImpl());
     }
 
     @Test(expected = IllegalStateException.class)
     public void testSubscribeWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_CONSUMER_GROUP_0, awaitDuration,
+            subExpressions);
         simpleConsumer.subscribe(FAKE_TOPIC_1, FilterExpression.SUB_ALL);
     }
 
     @Test(expected = IllegalStateException.class)
     public void testUnsubscribeWithoutStart() {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_CONSUMER_GROUP_0, awaitDuration,
+            subExpressions);
         simpleConsumer.unsubscribe(FAKE_TOPIC_0);
     }
 
     @Test
     public void testReceiveAsyncWithZeroMaxMessageNum() throws 
InterruptedException {
-        simpleConsumer = Mockito.spy(new 
SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+        simpleConsumer = Mockito.spy(new 
SimpleConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, awaitDuration,
             subExpressions));
         when(simpleConsumer.isRunning()).thenReturn(true);
         final CompletableFuture<List<MessageView>> future = 
simpleConsumer.receiveAsync(0,
@@ -104,7 +108,7 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test
     public void testAckAsync() throws ExecutionException, InterruptedException 
{
-        simpleConsumer = Mockito.spy(new 
SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+        simpleConsumer = Mockito.spy(new 
SimpleConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, awaitDuration,
             subExpressions));
         when(simpleConsumer.isRunning()).thenReturn(true);
         final MessageViewImpl messageView = fakeMessageViewImpl(false);
@@ -263,7 +267,7 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test
     public void testChangeInvisibleDurationAsync() throws ExecutionException, 
InterruptedException {
-        simpleConsumer = Mockito.spy(new 
SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+        simpleConsumer = Mockito.spy(new 
SimpleConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, awaitDuration,
             subExpressions));
         when(simpleConsumer.isRunning()).thenReturn(true);
         final MessageViewImpl messageView = fakeMessageViewImpl(false);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
new file mode 100644
index 0000000..ba0acf0
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.Subscription;
+import apache.rocketmq.v2.SubscriptionEntry;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.java.message.protocol.Resource;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SimpleSubscriptionSettingsTest extends TestBase {
+
+    @Test
+    public void testToProtobuf() {
+        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        String clientId = "clientId";
+        Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+        subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        final Duration longPollingTimeout = Duration.ofSeconds(15);
+        final SimpleSubscriptionSettings simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientId,
+            fakeEndpoints(), groupResource, requestTimeout, 
longPollingTimeout, subscriptionExpression);
+        final Settings settings = simpleSubscriptionSettings.toProtobuf();
+        Assert.assertEquals(settings.getClientType(), 
ClientType.SIMPLE_CONSUMER);
+        Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
+        Assert.assertTrue(settings.hasSubscription());
+        final Subscription subscription = settings.getSubscription();
+        Assert.assertEquals(subscription.getGroup(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+        Assert.assertFalse(subscription.getFifo());
+        Assert.assertEquals(subscription.getLongPollingTimeout(), 
Durations.fromNanos(longPollingTimeout.toNanos()));
+        final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
+        Assert.assertEquals(subscriptionsList.size(), 1);
+        final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+        Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.TAG);
+        Assert.assertEquals(subscriptionEntry.getTopic(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+    }
+
+    @Test
+    public void testToProtobufWithSqlExpression() {
+        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        String clientId = "clientId";
+        Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+        subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
+            + "b=TRUE)", FilterExpressionType.SQL92));
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        final Duration longPollingTimeout = Duration.ofSeconds(15);
+        final SimpleSubscriptionSettings simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientId,
+            fakeEndpoints(), groupResource, requestTimeout, 
longPollingTimeout, subscriptionExpression);
+        final Settings settings = simpleSubscriptionSettings.toProtobuf();
+        Assert.assertEquals(settings.getClientType(), 
ClientType.SIMPLE_CONSUMER);
+        Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
+        Assert.assertTrue(settings.hasSubscription());
+        final Subscription subscription = settings.getSubscription();
+        Assert.assertEquals(subscription.getGroup(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+        Assert.assertFalse(subscription.getFifo());
+        Assert.assertEquals(subscription.getLongPollingTimeout(), 
Durations.fromNanos(longPollingTimeout.toNanos()));
+        final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
+        Assert.assertEquals(subscriptionsList.size(), 1);
+        final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+        Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.SQL);
+        Assert.assertEquals(subscriptionEntry.getTopic(),
+            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+    }
+
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
index e1298bd..45d9c91 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
@@ -37,14 +37,12 @@ import 
org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class StandardConsumeServiceTest extends TestBase {
 
     @Test
-    @Ignore
-    public void testDispatch0() {
+    public void testDispatch() throws InterruptedException {
         final ProcessQueue processQueue0 = mock(ProcessQueue.class);
         final ProcessQueue processQueue1 = mock(ProcessQueue.class);
 
@@ -62,7 +60,6 @@ public class StandardConsumeServiceTest extends TestBase {
         
when(processQueue1.tryTakeMessage()).thenReturn(Optional.of(messageView1));
 
         MessageListener listener = messageView -> ConsumeResult.SUCCESS;
-
         MessageInterceptor interceptor = new MessageInterceptor() {
             @Override
             public void doBefore(MessageHookPoints messageHookPoints, 
List<MessageCommon> messageCommons) {
@@ -76,7 +73,10 @@ public class StandardConsumeServiceTest extends TestBase {
         final StandardConsumeService service = new 
StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
             SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
         service.dispatch0();
+        Thread.sleep(1000);
+        verify(processQueue0, times(1)).tryTakeMessage();
+        verify(processQueue1, times(1)).tryTakeMessage();
         verify(processQueue0, 
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
-        verify(processQueue0, 
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
+        verify(processQueue1, 
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
index 6f98f3f..9680a71 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -37,21 +40,53 @@ public class ProducerBuilderImplTest {
         builder.setTopics(null);
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetIllegalTopic() {
+        final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+        builder.setTopics("\t");
+    }
+
+    @Test
+    public void testSetTopic() {
+        final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+        builder.setTopics("abc");
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void testSetNegativeMaxAttempts() {
         final ProducerBuilderImpl builder = new ProducerBuilderImpl();
         builder.setMaxAttempts(-1);
     }
 
+    @Test
+    public void testSetMaxAttempts() {
+        final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+        builder.setMaxAttempts(3);
+    }
+
     @Test(expected = NullPointerException.class)
     public void testSetTransactionCheckerWithNull() {
         final ProducerBuilderImpl builder = new ProducerBuilderImpl();
         builder.setTransactionChecker(null);
     }
 
+    @Test
+    public void testSetTransactionChecker() {
+        final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+        builder.setTransactionChecker(messageView -> 
TransactionResolution.COMMIT);
+    }
+
     @Test(expected = NullPointerException.class)
     public void testBuildWithoutClientConfiguration() {
         final ProducerBuilderImpl builder = new ProducerBuilderImpl();
         builder.build();
     }
+
+    @Test
+    public void testBuild() throws ClientException {
+        ClientConfiguration clientConfiguration =
+            
ClientConfiguration.newBuilder().setEndpoints("foobar.com:8081").build();
+        final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+        builder.setClientConfiguration(clientConfiguration).build();
+    }
 }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index ef94469..b4dae64 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -51,7 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 @RunWith(MockitoJUnitRunner.class)
 public class ProducerImplTest extends TestBase {
     private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-        .setEndpoints(FAKE_ACCESS_POINT).build();
+        .setEndpoints(FAKE_ENDPOINTS).build();
 
     @SuppressWarnings("SameParameterValue")
     private ProducerImpl createProducerWithTopic(String topic) {
@@ -106,7 +106,7 @@ public class ProducerImplTest extends TestBase {
         Mockito.doReturn(Futures.immediateFailedFuture(exception))
             .when(producer).send0(any(Metadata.class), any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
         producer.send(message);
-        final int maxAttempts = 
producer.producerSettings.getRetryPolicy().getMaxAttempts();
+        final int maxAttempts = 
producer.publishingSettings.getRetryPolicy().getMaxAttempts();
         verify(producer, times(maxAttempts)).send0(any(Metadata.class), 
any(Endpoints.class), anyList(),
             any(MessageQueueImpl.class));
         producer.close();
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index e7670ed..d7d547f 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,11 +17,28 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.apache.rocketmq.client.java.message.MessageCommon;
+import org.apache.rocketmq.client.java.message.MessageType;
+import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
+import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -29,6 +46,52 @@ public class TransactionImplTest extends TestBase {
     @Mock
     ProducerImpl producer;
 
+    @Test
+    public void testTryAddMessage() throws IOException {
+        Set<String> set = new HashSet<>();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        final TransactionImpl transaction = new TransactionImpl(producer);
+        final Message message = fakeMessage(FAKE_TOPIC_0);
+        final PublishingMessageImpl publishingMessage = 
transaction.tryAddMessage(message);
+        Assert.assertEquals(publishingMessage.getMessageType(), 
MessageType.TRANSACTION);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTryAddExceededMessages() throws IOException {
+        Set<String> set = new HashSet<>();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        final TransactionImpl transaction = new TransactionImpl(producer);
+        final Message message0 = fakeMessage(FAKE_TOPIC_0);
+        transaction.tryAddMessage(message0);
+        final Message message1 = fakeMessage(FAKE_TOPIC_0);
+        transaction.tryAddMessage(message1);
+    }
+
+    @Test
+    public void testTryAddReceipt() throws IOException, ClientException, 
ExecutionException, InterruptedException {
+        Set<String> set = new HashSet<>();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        final TransactionImpl transaction = new TransactionImpl(producer);
+        final Message message = fakeMessage(FAKE_TOPIC_0);
+        final PublishingMessageImpl publishingMessage = 
transaction.tryAddMessage(message);
+        final SendReceiptImpl sendReceipt = 
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+        transaction.tryAddReceipt(publishingMessage, sendReceipt);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTryAddReceiptNotContained() throws ClientException, 
ExecutionException, InterruptedException {
+        PublishingMessageImpl publishingMessage = 
Mockito.mock(PublishingMessageImpl.class);
+        Set<String> set = new HashSet<>();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        final TransactionImpl transaction = new TransactionImpl(producer);
+        final SendReceiptImpl sendReceipt = 
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+        transaction.tryAddReceipt(publishingMessage, sendReceipt);
+    }
+
     @Test(expected = IllegalStateException.class)
     public void testCommitWithNoReceipts() throws ClientException {
         final TransactionImpl transaction = new TransactionImpl(producer);
@@ -40,4 +103,34 @@ public class TransactionImplTest extends TestBase {
         final TransactionImpl transaction = new TransactionImpl(producer);
         transaction.rollback();
     }
+
+    @Test
+    public void testCommit() throws IOException, ClientException, 
ExecutionException, InterruptedException {
+        Set<String> set = new HashSet<>();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        final TransactionImpl transaction = new TransactionImpl(producer);
+        final Message message = fakeMessage(FAKE_TOPIC_0);
+        final PublishingMessageImpl publishingMessage = 
transaction.tryAddMessage(message);
+        final SendReceiptImpl sendReceipt = 
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+        transaction.tryAddReceipt(publishingMessage, sendReceipt);
+        
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), 
any(MessageCommon.class),
+            any(MessageId.class), anyString(), 
any(TransactionResolution.class));
+        transaction.commit();
+    }
+
+    @Test
+    public void testRollback() throws IOException, ClientException, 
ExecutionException, InterruptedException {
+        Set<String> set = new HashSet<>();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        final TransactionImpl transaction = new TransactionImpl(producer);
+        final Message message = fakeMessage(FAKE_TOPIC_0);
+        final PublishingMessageImpl publishingMessage = 
transaction.tryAddMessage(message);
+        final SendReceiptImpl sendReceipt = 
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+        transaction.tryAddReceipt(publishingMessage, sendReceipt);
+        
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), 
any(MessageCommon.class),
+            any(MessageId.class), anyString(), 
any(TransactionResolution.class));
+        transaction.rollback();
+    }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/EncodingTest.java
similarity index 50%
copy from 
java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
copy to 
java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/EncodingTest.java
index e7670ed..9030761 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/EncodingTest.java
@@ -15,29 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.java.impl.producer;
+package org.apache.rocketmq.client.java.message.protocol;
 
-import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
-@RunWith(MockitoJUnitRunner.class)
-public class TransactionImplTest extends TestBase {
-    @Mock
-    ProducerImpl producer;
+public class EncodingTest extends TestBase {
 
-    @Test(expected = IllegalStateException.class)
-    public void testCommitWithNoReceipts() throws ClientException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        transaction.commit();
+    @Test
+    public void testToProtobuf() {
+        Assert.assertEquals(Encoding.toProtobuf(Encoding.IDENTITY), 
apache.rocketmq.v2.Encoding.IDENTITY);
+        Assert.assertEquals(Encoding.toProtobuf(Encoding.GZIP), 
apache.rocketmq.v2.Encoding.GZIP);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testRollbackWithNoReceipts() throws ClientException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        transaction.rollback();
+    @Test
+    public void testFromProtobuf() {
+        
Assert.assertEquals(Encoding.fromProtobuf(apache.rocketmq.v2.Encoding.IDENTITY),
 Encoding.IDENTITY);
+        
Assert.assertEquals(Encoding.fromProtobuf(apache.rocketmq.v2.Encoding.GZIP), 
Encoding.GZIP);
+    }
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromProtobufWithUnspecified() {
+        
Encoding.fromProtobuf(apache.rocketmq.v2.Encoding.ENCODING_UNSPECIFIED);
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/ResourceTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/ResourceTest.java
new file mode 100644
index 0000000..d2638d6
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/ResourceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.message.protocol;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResourceTest extends TestBase {
+
+    @Test
+    public void testGetterAndSetter() {
+        Resource resource = new Resource("foobar");
+        Assert.assertEquals(resource.getName(), "foobar");
+        Assert.assertEquals(resource.getNamespace(), StringUtils.EMPTY);
+
+        resource = new Resource("foo", "bar");
+        Assert.assertEquals(resource.getName(), "bar");
+        Assert.assertEquals(resource.getNamespace(), "foo");
+    }
+
+    @Test
+    public void testToProtobuf() {
+        Resource resource = new Resource("foo", "bar");
+        final apache.rocketmq.v2.Resource protobuf = resource.toProtobuf();
+        Assert.assertEquals(protobuf.getResourceNamespace(), "foo");
+        Assert.assertEquals(protobuf.getName(), "bar");
+    }
+
+    @Test
+    public void testEqual() {
+        Resource resource0 = new Resource("foo", "bar");
+        Resource resource1 = new Resource("foo", "bar");
+        Assert.assertEquals(resource0, resource1);
+        Resource resource2 = new Resource("foo0", "bar");
+        Assert.assertNotEquals(resource0, resource2);
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
new file mode 100644
index 0000000..41f408d
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.misc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.NoSuchAlgorithmException;
+import org.junit.Test;
+
+public class UtilitiesTest {
+    private final String body = "foobar";
+
+    @Test
+    public void testCompressAndUncompressByteArray() throws IOException {
+        final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+        final byte[] compressedBytes = Utilities.compressBytesGzip(bytes, 5);
+        final byte[] originalBytes = 
Utilities.uncompressBytesGzip(compressedBytes);
+        assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
+    }
+
+    @Test
+    public void testCrc32CheckSum() {
+        final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+        assertEquals("9EF61F95", Utilities.crc32CheckSum(bytes));
+    }
+
+    @Test
+    public void testMd5CheckSum() throws NoSuchAlgorithmException {
+        final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+        assertEquals("3858F62230AC3C915F300C664312C63F", 
Utilities.md5CheckSum(bytes));
+    }
+
+    @Test
+    public void testSha1CheckSum() throws NoSuchAlgorithmException {
+        final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+        assertEquals("8843D7F92416211DE9EBB963FF4CE28125932878", 
Utilities.sha1CheckSum(bytes));
+    }
+
+    @Test
+    public void testStackTrace() {
+        final String stackTrace = Utilities.stackTrace();
+        assertNotNull(stackTrace);
+        assertTrue(stackTrace.length() > 0);
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index cd4cb85..3c1fd06 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -62,7 +62,7 @@ import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
-import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
+import org.apache.rocketmq.client.java.impl.producer.PublishingSettings;
 import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
@@ -94,7 +94,7 @@ public class TestBase {
     protected static final String FAKE_RECEIPT_HANDLE_0 = "foo-bar-handle-0";
     protected static final String FAKE_RECEIPT_HANDLE_1 = "foo-bar-handle-1";
 
-    protected static final String FAKE_ACCESS_POINT = "127.0.0.1:9876";
+    protected static final String FAKE_ENDPOINTS = "127.0.0.1:9876";
 
     protected static final String FAKE_HOST_0 = "127.0.0.1";
     protected static final int FAKE_PORT_0 = 8080;
@@ -102,7 +102,7 @@ public class TestBase {
     protected static final String FAKE_HOST_1 = "127.0.0.2";
     protected static final int FAKE_PORT_1 = 8081;
 
-    protected static final String FAKE_GROUP_0 = "foo-bar-group-0";
+    protected static final String FAKE_CONSUMER_GROUP_0 = "foo-bar-group-0";
     protected static final String FAKE_TRANSACTION_ID = 
"foo-bar-transaction-id";
 
     protected static final long FAKE_OFFSET = 1;
@@ -358,8 +358,8 @@ public class TestBase {
         return new ExponentialBackoffRetryPolicy(3, Duration.ofMillis(100), 
Duration.ofSeconds(3), 2);
     }
 
-    protected ProducerSettings fakeProducerSettings() {
-        return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), 
fakeExponentialBackoffRetryPolicy(),
+    protected PublishingSettings fakeProducerSettings() {
+        return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(), 
fakeExponentialBackoffRetryPolicy(),
             Duration.ofSeconds(1), new HashSet<>());
     }
 
diff --git a/java/style/spotbugs-suppressions.xml 
b/java/style/spotbugs-suppressions.xml
index 4e28e14..1be718a 100644
--- a/java/style/spotbugs-suppressions.xml
+++ b/java/style/spotbugs-suppressions.xml
@@ -40,25 +40,25 @@
     </Match>
 
     <Match>
-        <Class 
name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+        <Class 
name="org.apache.rocketmq.client.java.impl.consumer.PushSubscriptionSettings" />
         <Method name="applySettingsCommand" />
         <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
     </Match>
 
     <Match>
-        <Class 
name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+        <Class 
name="org.apache.rocketmq.client.java.impl.consumer.PushSubscriptionSettings" />
         <Method name="applySettingsCommand" />
         <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
     </Match>
 
     <Match>
-        <Class 
name="org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerSettings" />
+        <Class 
name="org.apache.rocketmq.client.java.impl.consumer.SimpleSubscriptionSettings" 
/>
         <Method name="applySettingsCommand" />
         <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
     </Match>
 
     <Match>
-        <Class 
name="org.apache.rocketmq.client.java.impl.producer.ProducerSettings" />
+        <Class 
name="org.apache.rocketmq.client.java.impl.producer.PublishingSettings" />
         <Method name="applySettingsCommand" />
         <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
     </Match>

Reply via email to