This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aaa7cd  Polish code (#111)
8aaa7cd is described below

commit 8aaa7cd912b2a8219b0a85de2d58271ebdb1cd75
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 19:18:43 2022 +0800

    Polish code (#111)
---
 .../client/apis/ClientConfigurationBuilder.java         |  6 +++++-
 .../rocketmq/client/apis/consumer/PushConsumer.java     |  2 +-
 .../client/apis/consumer/PushConsumerBuilder.java       |  6 +++++-
 .../rocketmq/client/apis/consumer/SimpleConsumer.java   |  6 ++++++
 .../client/apis/consumer/SimpleConsumerBuilder.java     |  9 ++++++++-
 .../apache/rocketmq/client/apis/producer/Producer.java  |  3 ++-
 .../rocketmq/client/apis/producer/ProducerBuilder.java  |  3 +--
 .../client/java/example/AsyncSimpleConsumerExample.java |  2 +-
 .../client/java/example/SimpleConsumerExample.java      |  2 +-
 .../client/java/impl/consumer/PushConsumerImpl.java     |  1 -
 .../client/java/impl/consumer/SimpleConsumerImpl.java   |  3 +++
 .../rocketmq/client/java/metrics/ClientMeter.java       | 17 ++++++++++++-----
 .../client/java/metrics/ClientMeterProvider.java        | 10 +++++-----
 .../org/apache/rocketmq/client/java/misc/Utilities.java |  5 +++++
 14 files changed, 55 insertions(+), 20 deletions(-)

diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index f5f5c20..bf0f657 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.client.apis;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.time.Duration;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 
 /**
  * Builder to set {@link ClientConfiguration}.
@@ -56,7 +58,9 @@ public class ClientConfigurationBuilder {
     /**
      * Configure request timeout for ordinary RPC.
      *
-     * <p>request timeout is 1s by default. Especially, request timeout here 
does not work when RPC is long-polling.
+     * <p>request timeout is 3s by default. Especially, the RPC request 
timeout for long-polling of
+     * {@link SimpleConsumer} is increased by request timeout here based on the
+     * {@linkplain SimpleConsumerBuilder#setAwaitDuration(Duration) await 
duration}.
      *
      * @param requestTimeout RPC request timeout.
      * @return the client configuration builder instance.
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
index c0ecf17..d123e75 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
@@ -102,7 +102,7 @@ public interface PushConsumer extends Closeable {
      * Close the push consumer and release all related resources.
      *
      * <p>Once push consumer is closed, <strong>it could not be started once 
again.</strong> we maintained an FSM
-     * (finite-state machine) to record the different states for each producer.
+     * (finite-state machine) to record the different states for each push 
consumer.
      */
     @Override
     void close() throws IOException;
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
index d628c60..e971d94 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
@@ -82,7 +82,11 @@ public interface PushConsumerBuilder {
     PushConsumerBuilder setConsumptionThreadCount(int count);
 
     /**
-     * Finalize the build of {@link PushConsumer}.
+     * Finalize the build of {@link PushConsumer} and start.
+     *
+     * <p>This method will block until the push consumer starts successfully.
+     *
+     * <p>Especially, if this method is invoked more than once, different push 
consumers will be created and started.
      *
      * @return the push consumer instance.
      */
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
index 085139c..6d7d992 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
@@ -165,6 +165,12 @@ public interface SimpleConsumer extends Closeable {
      */
     CompletableFuture<Void> changeInvisibleDurationAsync(MessageView 
messageView, Duration invisibleDuration);
 
+    /**
+     * Close the simple consumer and release all related resources.
+     *
+     * <p>Once simple consumer is closed, <strong>it could not be started once 
again.</strong> we maintained an FSM
+     * (finite-state machine) to record the different states for each simple 
consumer.
+     */
     @Override
     void close() throws IOException;
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
index 774d4d4..bd53ef0 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.apis.consumer;
 import java.time.Duration;
 import java.util.Map;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
 import org.apache.rocketmq.client.apis.ClientException;
 
 /**
@@ -52,9 +53,13 @@ public interface SimpleConsumerBuilder {
 
     /**
      * Set the max await time when receive messages from the server.
-     * The simple consumer will hold this long-polling receive requests until  
a message is returned or a timeout
+     *
+     * <p>The simple consumer will hold this long-polling receive requests 
until a message is returned or a timeout
      * occurs.
      *
+     * <p> Especially, the RPC request timeout for long-polling of {@link 
SimpleConsumer} is increased by
+     * {@linkplain ClientConfigurationBuilder#setRequestTimeout(Duration) 
request timeout} based on await duration here.
+     *
      * @param awaitDuration The maximum time to block when no message is 
available.
      * @return the consumer builder instance.
      */
@@ -65,6 +70,8 @@ public interface SimpleConsumerBuilder {
      *
      * <p>This method will block until the simple consumer starts successfully.
      *
+     * <p>Especially, if this method is invoked more than once, different 
simple consumers will be created and started.
+     *
      * @return the simple consumer instance.
      */
     SimpleConsumer build() throws ClientException;
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
index 014aca9..71a3a57 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
@@ -79,7 +79,8 @@ public interface Producer extends Closeable {
     /**
      * Closes the producer and releases all related resources.
      *
-     * <p>This method does not return until all related resource is released.
+     * <p>Once producer is closed, <strong>it could not be started once 
again.</strong> we maintained an FSM
+     * (finite-state machine) to record the different states for each producer.
      */
     @Override
     void close() throws IOException;
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
index 7c6b80b..cc4bdc6 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
@@ -77,8 +77,7 @@ public interface ProducerBuilder {
     /**
      * Finalize the build of {@link Producer} instance and start.
      *
-     * <p>The producer does a series of preparatory work during startup, which 
could help to identify more unexpected
-     * error earlier.
+     * <p>This method will block until the push consumer starts successfully.
      *
      * <p>Especially, if this method is invoked more than once, different 
producers will be created and started.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 12bba02..3cabaa4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -74,7 +74,7 @@ public class AsyncSimpleConsumerExample {
         // Max message num for each long polling.
         int maxMessageNum = 16;
         // Set message invisible duration after it is received.
-        Duration invisibleDuration = Duration.ofSeconds(30);
+        Duration invisibleDuration = Duration.ofSeconds(5);
         final CompletableFuture<List<MessageView>> future0 = 
consumer.receiveAsync(maxMessageNum, invisibleDuration);
         future0.thenAccept(message -> {
             final Map<MessageId, CompletableFuture<Void>> map =
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 5428145..63a035a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -70,7 +70,7 @@ public class SimpleConsumerExample {
         // Max message num for each long polling.
         int maxMessageNum = 16;
         // Set message invisible duration after it is received.
-        Duration invisibleDuration = Duration.ofSeconds(30);
+        Duration invisibleDuration = Duration.ofSeconds(5);
         final List<MessageView> messages = consumer.receive(maxMessageNum, 
invisibleDuration);
         for (MessageView message : messages) {
             try {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index e98cd55..3d17ea9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -434,7 +434,6 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
                         syncProcessQueue(topic, latest, filterExpression);
                     }
 
-                    @SuppressWarnings("NullableProblems")
                     @Override
                     public void onFailure(Throwable t) {
                         LOGGER.error("Exception raised while scanning the 
assignments, topic={}, clientId={}", topic,
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index d8ce5d8..9fad3a1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -348,6 +348,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
         }, MoreExecutors.directExecutor());
     }
 
+    /**
+     * @see SimpleConsumer#close()
+     */
     @Override
     public void close() {
         this.stopAsync().awaitTerminated();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index baa4662..8acfe15 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -32,31 +32,37 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClientMeter {
-    static ClientMeter DISABLED = new ClientMeter();
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientMeter.class);
 
     private final boolean enabled;
     private final Meter meter;
     private final Endpoints endpoints;
     private final SdkMeterProvider provider;
+    private final String clientId;
     private final ConcurrentMap<String /* histogram name */, DoubleHistogram> 
histogramMap;
 
-    public ClientMeter(Meter meter, Endpoints endpoints, SdkMeterProvider 
provider) {
+    public ClientMeter(Meter meter, Endpoints endpoints, SdkMeterProvider 
provider, String clientId) {
         this.enabled = true;
         this.meter = checkNotNull(meter, "meter should not be null");
         this.endpoints = checkNotNull(endpoints, "endpoints should not be 
null");
         this.provider = checkNotNull(provider, "provider should not be null");
+        this.clientId = checkNotNull(clientId, "clientId should not be null");
         this.histogramMap = new ConcurrentHashMap<>();
     }
 
-    private ClientMeter() {
+    private ClientMeter(String clientId) {
         this.enabled = false;
         this.meter = null;
         this.endpoints = null;
         this.provider = null;
+        this.clientId = checkNotNull(clientId, "clientId should not be null");
         this.histogramMap = new ConcurrentHashMap<>();
     }
 
+    static ClientMeter disabledInstance(String clientId) {
+        return new ClientMeter(clientId);
+    }
+
     public boolean isEnabled() {
         return enabled;
     }
@@ -79,8 +85,9 @@ public class ClientMeter {
         provider.shutdown().whenComplete(latch::countDown);
         try {
             latch.await();
+            LOGGER.info("Shutdown client meter successfully, clientId={}, 
endpoints={}", clientId, endpoints);
         } catch (Throwable t) {
-            LOGGER.error("Failed to shutdown message meter, endpoints={}", 
endpoints, t);
+            LOGGER.error("Failed to shutdown message meter, clientId={}, 
endpoints={}", clientId, endpoints, t);
         }
     }
 
@@ -96,7 +103,7 @@ public class ClientMeter {
         return MoreObjects.toStringHelper(this)
             .add("enabled", enabled)
             .add("meter", meter)
-            .add("metricEndpoints", endpoints)
+            .add("endpoints", endpoints)
             .add("provider", provider)
             .add("histogramMap", histogramMap)
             .toString();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
index 532b787..4abddfd 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
@@ -60,7 +60,7 @@ public class ClientMeterProvider {
     public ClientMeterProvider(ClientImpl client) {
         this.client = client;
         this.client.registerMessageInterceptor(new 
MessageMeterInterceptor(this));
-        this.clientMeter = ClientMeter.DISABLED;
+        this.clientMeter = ClientMeter.disabledInstance(client.clientId());
         this.messageCacheObserver = null;
     }
 
@@ -76,13 +76,13 @@ public class ClientMeterProvider {
         final String clientId = client.clientId();
         try {
             if (clientMeter.satisfy(metric)) {
-                LOGGER.debug("Metric settings is satisfied by the current 
message meter, clientId={}", clientId);
+                LOGGER.info("Metric settings is satisfied by the current 
message meter, clientId={}", clientId);
                 return;
             }
             if (!metric.isOn()) {
-                LOGGER.debug("Metric is off, clientId={}", clientId);
+                LOGGER.info("Metric is off, clientId={}", clientId);
                 clientMeter.shutdown();
-                clientMeter = ClientMeter.DISABLED;
+                clientMeter = ClientMeter.disabledInstance(client.clientId());
                 return;
             }
             final Endpoints endpoints = metric.getEndpoints();
@@ -135,7 +135,7 @@ public class ClientMeterProvider {
 
             // Reset message meter.
             ClientMeter existedClientMeter = clientMeter;
-            clientMeter = new ClientMeter(meter, endpoints, provider);
+            clientMeter = new ClientMeter(meter, endpoints, provider, 
clientId);
             existedClientMeter.shutdown();
             LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints, 
clientId);
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index 2705cd7..d036c14 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -33,6 +33,7 @@ import java.util.Enumeration;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.CRC32;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
@@ -46,6 +47,8 @@ public class Utilities {
     private static final String OS_NAME = "os.name";
     private static final String OS_VERSION = "os.version";
 
+    private static final AtomicLong CLIENT_INDEX = new AtomicLong(0);
+
     private static final Random RANDOM = new SecureRandom();
     private static final int PROCESS_ID_NOT_SET = -2;
     private static final int PROCESS_ID_NOT_FOUND = -1;
@@ -272,6 +275,8 @@ public class Utilities {
         sb.append(CLIENT_ID_SEPARATOR);
         sb.append(Utilities.processId());
         sb.append(CLIENT_ID_SEPARATOR);
+        sb.append(CLIENT_INDEX.getAndIncrement());
+        sb.append(CLIENT_ID_SEPARATOR);
         sb.append(Long.toString(System.nanoTime(), 36));
         return sb.toString();
     }

Reply via email to