This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8aaa7cd Polish code (#111)
8aaa7cd is described below
commit 8aaa7cd912b2a8219b0a85de2d58271ebdb1cd75
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 19:18:43 2022 +0800
Polish code (#111)
---
.../client/apis/ClientConfigurationBuilder.java | 6 +++++-
.../rocketmq/client/apis/consumer/PushConsumer.java | 2 +-
.../client/apis/consumer/PushConsumerBuilder.java | 6 +++++-
.../rocketmq/client/apis/consumer/SimpleConsumer.java | 6 ++++++
.../client/apis/consumer/SimpleConsumerBuilder.java | 9 ++++++++-
.../apache/rocketmq/client/apis/producer/Producer.java | 3 ++-
.../rocketmq/client/apis/producer/ProducerBuilder.java | 3 +--
.../client/java/example/AsyncSimpleConsumerExample.java | 2 +-
.../client/java/example/SimpleConsumerExample.java | 2 +-
.../client/java/impl/consumer/PushConsumerImpl.java | 1 -
.../client/java/impl/consumer/SimpleConsumerImpl.java | 3 +++
.../rocketmq/client/java/metrics/ClientMeter.java | 17 ++++++++++++-----
.../client/java/metrics/ClientMeterProvider.java | 10 +++++-----
.../org/apache/rocketmq/client/java/misc/Utilities.java | 5 +++++
14 files changed, 55 insertions(+), 20 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index f5f5c20..bf0f657 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.client.apis;
import static com.google.common.base.Preconditions.checkNotNull;
import java.time.Duration;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
/**
* Builder to set {@link ClientConfiguration}.
@@ -56,7 +58,9 @@ public class ClientConfigurationBuilder {
/**
* Configure request timeout for ordinary RPC.
*
- * <p>request timeout is 1s by default. Especially, request timeout here
does not work when RPC is long-polling.
+ * <p>request timeout is 3s by default. Especially, the RPC request
timeout for long-polling of
+ * {@link SimpleConsumer} is increased by request timeout here based on the
+ * {@linkplain SimpleConsumerBuilder#setAwaitDuration(Duration) await
duration}.
*
* @param requestTimeout RPC request timeout.
* @return the client configuration builder instance.
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
index c0ecf17..d123e75 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java
@@ -102,7 +102,7 @@ public interface PushConsumer extends Closeable {
* Close the push consumer and release all related resources.
*
* <p>Once push consumer is closed, <strong>it could not be started once
again.</strong> we maintained an FSM
- * (finite-state machine) to record the different states for each producer.
+ * (finite-state machine) to record the different states for each push
consumer.
*/
@Override
void close() throws IOException;
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
index d628c60..e971d94 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
@@ -82,7 +82,11 @@ public interface PushConsumerBuilder {
PushConsumerBuilder setConsumptionThreadCount(int count);
/**
- * Finalize the build of {@link PushConsumer}.
+ * Finalize the build of {@link PushConsumer} and start.
+ *
+ * <p>This method will block until the push consumer starts successfully.
+ *
+ * <p>Especially, if this method is invoked more than once, different push
consumers will be created and started.
*
* @return the push consumer instance.
*/
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
index 085139c..6d7d992 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
@@ -165,6 +165,12 @@ public interface SimpleConsumer extends Closeable {
*/
CompletableFuture<Void> changeInvisibleDurationAsync(MessageView
messageView, Duration invisibleDuration);
+ /**
+ * Close the simple consumer and release all related resources.
+ *
+ * <p>Once simple consumer is closed, <strong>it could not be started once
again.</strong> we maintained an FSM
+ * (finite-state machine) to record the different states for each simple
consumer.
+ */
@Override
void close() throws IOException;
}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
index 774d4d4..bd53ef0 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.apis.consumer;
import java.time.Duration;
import java.util.Map;
import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
/**
@@ -52,9 +53,13 @@ public interface SimpleConsumerBuilder {
/**
* Set the max await time when receive messages from the server.
- * The simple consumer will hold this long-polling receive requests until
a message is returned or a timeout
+ *
+ * <p>The simple consumer will hold this long-polling receive requests
until a message is returned or a timeout
* occurs.
*
+ * <p> Especially, the RPC request timeout for long-polling of {@link
SimpleConsumer} is increased by
+ * {@linkplain ClientConfigurationBuilder#setRequestTimeout(Duration)
request timeout} based on await duration here.
+ *
* @param awaitDuration The maximum time to block when no message is
available.
* @return the consumer builder instance.
*/
@@ -65,6 +70,8 @@ public interface SimpleConsumerBuilder {
*
* <p>This method will block until the simple consumer starts successfully.
*
+ * <p>Especially, if this method is invoked more than once, different
simple consumers will be created and started.
+ *
* @return the simple consumer instance.
*/
SimpleConsumer build() throws ClientException;
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
index 014aca9..71a3a57 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
@@ -79,7 +79,8 @@ public interface Producer extends Closeable {
/**
* Closes the producer and releases all related resources.
*
- * <p>This method does not return until all related resource is released.
+ * <p>Once producer is closed, <strong>it could not be started once
again.</strong> we maintained an FSM
+ * (finite-state machine) to record the different states for each producer.
*/
@Override
void close() throws IOException;
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
index 7c6b80b..cc4bdc6 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/ProducerBuilder.java
@@ -77,8 +77,7 @@ public interface ProducerBuilder {
/**
* Finalize the build of {@link Producer} instance and start.
*
- * <p>The producer does a series of preparatory work during startup, which
could help to identify more unexpected
- * error earlier.
+ * <p>This method will block until the push consumer starts successfully.
*
* <p>Especially, if this method is invoked more than once, different
producers will be created and started.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 12bba02..3cabaa4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -74,7 +74,7 @@ public class AsyncSimpleConsumerExample {
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
- Duration invisibleDuration = Duration.ofSeconds(30);
+ Duration invisibleDuration = Duration.ofSeconds(5);
final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum, invisibleDuration);
future0.thenAccept(message -> {
final Map<MessageId, CompletableFuture<Void>> map =
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 5428145..63a035a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -70,7 +70,7 @@ public class SimpleConsumerExample {
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
- Duration invisibleDuration = Duration.ofSeconds(30);
+ Duration invisibleDuration = Duration.ofSeconds(5);
final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
for (MessageView message : messages) {
try {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index e98cd55..3d17ea9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -434,7 +434,6 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
syncProcessQueue(topic, latest, filterExpression);
}
- @SuppressWarnings("NullableProblems")
@Override
public void onFailure(Throwable t) {
LOGGER.error("Exception raised while scanning the
assignments, topic={}, clientId={}", topic,
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index d8ce5d8..9fad3a1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -348,6 +348,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
}, MoreExecutors.directExecutor());
}
+ /**
+ * @see SimpleConsumer#close()
+ */
@Override
public void close() {
this.stopAsync().awaitTerminated();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index baa4662..8acfe15 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -32,31 +32,37 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientMeter {
- static ClientMeter DISABLED = new ClientMeter();
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeter.class);
private final boolean enabled;
private final Meter meter;
private final Endpoints endpoints;
private final SdkMeterProvider provider;
+ private final String clientId;
private final ConcurrentMap<String /* histogram name */, DoubleHistogram>
histogramMap;
- public ClientMeter(Meter meter, Endpoints endpoints, SdkMeterProvider
provider) {
+ public ClientMeter(Meter meter, Endpoints endpoints, SdkMeterProvider
provider, String clientId) {
this.enabled = true;
this.meter = checkNotNull(meter, "meter should not be null");
this.endpoints = checkNotNull(endpoints, "endpoints should not be
null");
this.provider = checkNotNull(provider, "provider should not be null");
+ this.clientId = checkNotNull(clientId, "clientId should not be null");
this.histogramMap = new ConcurrentHashMap<>();
}
- private ClientMeter() {
+ private ClientMeter(String clientId) {
this.enabled = false;
this.meter = null;
this.endpoints = null;
this.provider = null;
+ this.clientId = checkNotNull(clientId, "clientId should not be null");
this.histogramMap = new ConcurrentHashMap<>();
}
+ static ClientMeter disabledInstance(String clientId) {
+ return new ClientMeter(clientId);
+ }
+
public boolean isEnabled() {
return enabled;
}
@@ -79,8 +85,9 @@ public class ClientMeter {
provider.shutdown().whenComplete(latch::countDown);
try {
latch.await();
+ LOGGER.info("Shutdown client meter successfully, clientId={},
endpoints={}", clientId, endpoints);
} catch (Throwable t) {
- LOGGER.error("Failed to shutdown message meter, endpoints={}",
endpoints, t);
+ LOGGER.error("Failed to shutdown message meter, clientId={},
endpoints={}", clientId, endpoints, t);
}
}
@@ -96,7 +103,7 @@ public class ClientMeter {
return MoreObjects.toStringHelper(this)
.add("enabled", enabled)
.add("meter", meter)
- .add("metricEndpoints", endpoints)
+ .add("endpoints", endpoints)
.add("provider", provider)
.add("histogramMap", histogramMap)
.toString();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
index 532b787..4abddfd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
@@ -60,7 +60,7 @@ public class ClientMeterProvider {
public ClientMeterProvider(ClientImpl client) {
this.client = client;
this.client.registerMessageInterceptor(new
MessageMeterInterceptor(this));
- this.clientMeter = ClientMeter.DISABLED;
+ this.clientMeter = ClientMeter.disabledInstance(client.clientId());
this.messageCacheObserver = null;
}
@@ -76,13 +76,13 @@ public class ClientMeterProvider {
final String clientId = client.clientId();
try {
if (clientMeter.satisfy(metric)) {
- LOGGER.debug("Metric settings is satisfied by the current
message meter, clientId={}", clientId);
+ LOGGER.info("Metric settings is satisfied by the current
message meter, clientId={}", clientId);
return;
}
if (!metric.isOn()) {
- LOGGER.debug("Metric is off, clientId={}", clientId);
+ LOGGER.info("Metric is off, clientId={}", clientId);
clientMeter.shutdown();
- clientMeter = ClientMeter.DISABLED;
+ clientMeter = ClientMeter.disabledInstance(client.clientId());
return;
}
final Endpoints endpoints = metric.getEndpoints();
@@ -135,7 +135,7 @@ public class ClientMeterProvider {
// Reset message meter.
ClientMeter existedClientMeter = clientMeter;
- clientMeter = new ClientMeter(meter, endpoints, provider);
+ clientMeter = new ClientMeter(meter, endpoints, provider,
clientId);
existedClientMeter.shutdown();
LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints,
clientId);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index 2705cd7..d036c14 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -33,6 +33,7 @@ import java.util.Enumeration;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -46,6 +47,8 @@ public class Utilities {
private static final String OS_NAME = "os.name";
private static final String OS_VERSION = "os.version";
+ private static final AtomicLong CLIENT_INDEX = new AtomicLong(0);
+
private static final Random RANDOM = new SecureRandom();
private static final int PROCESS_ID_NOT_SET = -2;
private static final int PROCESS_ID_NOT_FOUND = -1;
@@ -272,6 +275,8 @@ public class Utilities {
sb.append(CLIENT_ID_SEPARATOR);
sb.append(Utilities.processId());
sb.append(CLIENT_ID_SEPARATOR);
+ sb.append(CLIENT_INDEX.getAndIncrement());
+ sb.append(CLIENT_ID_SEPARATOR);
sb.append(Long.toString(System.nanoTime(), 36));
return sb.toString();
}