This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 42950d9 Adapt to the latest status code (#43)
42950d9 is described below
commit 42950d9121172dadc4d40818991e117198555e8c
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jul 13 15:42:49 2022 +0800
Adapt to the latest status code (#43)
* Set grpc log level to error
* Adapt to the status code
* Follow the latest parameter limitation
* Fix the naming issue
* Add pull_request#types in github action
---
.github/workflows/cpp_build.yml | 5 +++-
.github/workflows/csharp_build.yml | 10 ++++---
.github/workflows/golang_build.yml | 5 +++-
.github/workflows/java_build.yml | 28 ++++++++++---------
.../src/main/resources/rocketmq.logback.xml | 3 +-
.../rocketmq/client/java/impl/ClientImpl.java | 9 +++---
.../client/java/impl/consumer/ConsumerImpl.java | 11 ++++----
.../impl/consumer/PushConsumerBuilderImpl.java | 8 ++----
.../java/impl/consumer/PushConsumerImpl.java | 2 +-
.../java/impl/consumer/ReceiveMessageResult.java | 1 +
.../impl/consumer/SimpleConsumerBuilderImpl.java | 4 +--
.../java/impl/consumer/SimpleConsumerImpl.java | 1 +
.../client/java/impl/producer/SendReceiptImpl.java | 1 +
.../client/java/message/MessageBuilderImpl.java | 2 +-
.../{MessageMeter.java => ClientMeter.java} | 10 +++----
...MeterProvider.java => ClientMeterProvider.java} | 32 ++++++++++------------
...terceptor.java => MessageMeterInterceptor.java} | 32 +++++++++++-----------
java/client/src/main/resources/logback.xml | 3 +-
.../impl/consumer/PushConsumerBuilderImplTest.java | 8 ------
.../impl/consumer/SimpleConsumerBuilderTest.java | 8 ------
.../impl/producer/ProducerBuilderImplTest.java | 8 ------
.../client/java/message/MessageImplTest.java | 6 ----
22 files changed, 86 insertions(+), 111 deletions(-)
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 47c6cd6..6e9504e 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -1,5 +1,8 @@
name: CPP Build
-on: [push, pull_request]
+on:
+ pull_request:
+ types: [opened, reopened]
+ push:
jobs:
cpp_build:
name: "CPP (${{ matrix.os }})"
diff --git a/.github/workflows/csharp_build.yml
b/.github/workflows/csharp_build.yml
index 241ef69..54bd2d4 100644
--- a/.github/workflows/csharp_build.yml
+++ b/.github/workflows/csharp_build.yml
@@ -1,16 +1,19 @@
name: C# Build
-on: [push, pull_request]
+on:
+ pull_request:
+ types: [opened, reopened]
+ push:
jobs:
c_sharp:
name: "C# (ubuntu-18.04)"
runs-on: ubuntu-18.04
steps:
- - name: Checkout
+ - name: Checkout
uses: actions/checkout@v2
- name: Setup dotnet
uses: actions/setup-dotnet@v1
with:
- dotnet-version: |
+ dotnet-version: |
5.0.x
6.0.x
- name: Build artifacts
@@ -18,4 +21,3 @@ jobs:
run: |
dotnet --version
dotnet build
-
diff --git a/.github/workflows/golang_build.yml
b/.github/workflows/golang_build.yml
index 0212df2..9e34eb5 100644
--- a/.github/workflows/golang_build.yml
+++ b/.github/workflows/golang_build.yml
@@ -1,5 +1,8 @@
name: Golang Build
-on: [push, pull_request]
+on:
+ pull_request:
+ types: [opened, reopened]
+ push:
jobs:
cpp_build:
name: "Golang (${{ matrix.os }}, Go ${{ matrix.go }})"
diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index 2e79258..5b31ec1 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -1,5 +1,8 @@
name: Java Build
-on: [push, pull_request]
+on:
+ pull_request:
+ types: [opened, reopened]
+ push:
jobs:
java_build:
name: "Java (${{ matrix.os }}, JDK-${{ matrix.jdk }})"
@@ -9,15 +12,14 @@ jobs:
os: [ubuntu-18.04, windows-2019]
jdk: [11, 17]
steps:
- - name: Checkout
- uses: actions/checkout@v2
- - name: Set up JDK ${{ matrix.jdk }}
- uses: actions/setup-java@v2
- with:
- java-version: ${{ matrix.jdk }}
- distribution: 'adopt'
- cache: maven
- - name: Build with Maven
- working-directory: ./java
- run: mvn -B package --file pom.xml
-
+ - name: Checkout
+ uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.jdk }}
+ uses: actions/setup-java@v2
+ with:
+ java-version: ${{ matrix.jdk }}
+ distribution: "adopt"
+ cache: maven
+ - name: Build with Maven
+ working-directory: ./java
+ run: mvn -B package --file pom.xml
diff --git a/java/client-shade/src/main/resources/rocketmq.logback.xml
b/java/client-shade/src/main/resources/rocketmq.logback.xml
index b7f694b..344b29f 100644
--- a/java/client-shade/src/main/resources/rocketmq.logback.xml
+++ b/java/client-shade/src/main/resources/rocketmq.logback.xml
@@ -33,6 +33,5 @@
<appender-ref ref="DefaultAppender" additivity="false"/>
</root>
<!-- ref: https://github.com/grpc/grpc-java/issues/3033 -->
- <logger name="io.grpc" level="error"/>
- <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/>
+ <logger name="org.apache.rocketmq.shaded.io.grpc"
level="${rocketmq.grpc.log.level:-error}"/>
</configuration>
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 0c801d5..3cf8a36 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageCommon;
-import org.apache.rocketmq.client.java.metrics.MessageMeterProvider;
+import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
import org.apache.rocketmq.client.java.metrics.Metric;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
@@ -97,7 +97,7 @@ public abstract class ClientImpl extends AbstractIdleService
implements Client,
// Thread-safe set.
protected final Set<Endpoints> isolated;
protected final ExecutorService clientCallbackExecutor;
- protected final MessageMeterProvider messageMeterProvider;
+ protected final ClientMeterProvider clientMeterProvider;
/**
* Telemetry command executor, which is aims to execute commands from
remote.
*/
@@ -147,8 +147,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
new LinkedBlockingQueue<>(),
new ThreadFactoryImpl("ClientCallbackWorker"));
- this.messageMeterProvider = new MessageMeterProvider(this);
-
+ this.clientMeterProvider = new ClientMeterProvider(this);
this.telemetryCommandExecutor = new ThreadPoolExecutor(
1,
1,
@@ -317,7 +316,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
public final void onSettingsCommand(Endpoints endpoints, Settings
settings) {
final Metric metric = new Metric(settings.getMetric());
- messageMeterProvider.reset(metric);
+ clientMeterProvider.reset(metric);
LOGGER.info("Receive settings from remote, endpoints={}", endpoints);
this.getClientSettings().applySettingsCommand(settings);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index a8b04ec..2f50f2a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageId;
@@ -62,6 +63,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
abstract class ConsumerImpl extends ClientImpl {
+ static final Pattern CONSUMER_GROUP_PATTERN =
Pattern.compile("^[%a-zA-Z0-9_-]+$");
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerImpl.class);
private final String consumerGroup;
@@ -75,7 +77,6 @@ abstract class ConsumerImpl extends ClientImpl {
protected ListenableFuture<ReceiveMessageResult>
receiveMessage(ReceiveMessageRequest request,
MessageQueueImpl mq, Duration timeout) {
List<MessageViewImpl> messages = new ArrayList<>();
- final SettableFuture<ReceiveMessageResult> future0 =
SettableFuture.create();
try {
Metadata metadata = sign();
final Endpoints endpoints = mq.getBroker().getEndpoints();
@@ -84,8 +85,9 @@ abstract class ConsumerImpl extends ClientImpl {
metadata, request, timeout);
return Futures.transform(future, context -> {
final Iterator<ReceiveMessageResponse> it = context.getResp();
- // Null here means status not set yet.
- Status status = null;
+ Status status =
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
+ .setMessage("status was not set by server")
+ .build();
Timestamp deliveryTimestampFromRemote = null;
List<Message> messageList = new ArrayList<>();
while (it.hasNext()) {
@@ -112,8 +114,7 @@ abstract class ConsumerImpl extends ClientImpl {
return new ReceiveMessageResult(endpoints,
context.getRpcContext().getRequestId(), status, messages);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
- future0.setException(t);
- return future0;
+ return Futures.immediateFailedFuture(t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
index de3c6ef..d270ae7 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.java.impl.consumer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.CONSUMER_GROUP_PATTERN;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Pattern;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -34,14 +34,12 @@ import
org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
* Implementation of {@link PushConsumerBuilder}
*/
public class PushConsumerBuilderImpl implements PushConsumerBuilder {
- private static final Pattern CONSUMER_GROUP_PATTERN =
Pattern.compile("^[%|a-zA-Z0-9._-]{1,255}$");
-
private ClientConfiguration clientConfiguration = null;
private String consumerGroup = null;
private Map<String, FilterExpression> subscriptionExpressions = new
ConcurrentHashMap<>();
private MessageListener messageListener = null;
- private int maxCacheMessageCount = 5000;
- private int maxCacheMessageSizeInBytes = 500 * 1024 * 1024;
+ private int maxCacheMessageCount = 1024;
+ private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
private int consumptionThreadCount = 20;
/**
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 9b34b56..7688b4b 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -153,7 +153,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
try {
LOGGER.info("Begin to start the rocketmq push consumer,
clientId={}", clientId);
super.startUp();
- messageMeterProvider.setMessageCacheObserver(this);
+ clientMeterProvider.setMessageCacheObserver(this);
final ScheduledExecutorService scheduler =
clientManager.getScheduler();
this.consumeService = createConsumeService();
this.consumeService.startAsync().awaitRunning();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index b107d78..58c2206 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -54,6 +54,7 @@ public class ReceiveMessageResult {
case ILLEGAL_TOPIC:
case ILLEGAL_CONSUMER_GROUP:
case ILLEGAL_FILTER_EXPRESSION:
+ case ILLEGAL_INVISIBLE_TIME:
case CLIENT_ID_REQUIRED:
this.exception = new BadRequestException(code.getNumber(),
status.getMessage());
break;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
index 297845d..da31500 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
@@ -19,11 +19,11 @@ package org.apache.rocketmq.client.java.impl.consumer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.CONSUMER_GROUP_PATTERN;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Pattern;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -31,8 +31,6 @@ import
org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
- private static final Pattern CONSUMER_GROUP_PATTERN =
Pattern.compile("^[%|a-zA-Z0-9._-]{1,255}$");
-
private ClientConfiguration clientConfiguration = null;
private String consumerGroup = null;
private Map<String, FilterExpression> subscriptionExpressions = new
ConcurrentHashMap<>();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 77750e1..f5029f1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -316,6 +316,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
case BAD_REQUEST:
case ILLEGAL_TOPIC:
case ILLEGAL_CONSUMER_GROUP:
+ case ILLEGAL_INVISIBLE_TIME:
case INVALID_RECEIPT_HANDLE:
case CLIENT_ID_REQUIRED:
throw new BadRequestException(code.getNumber(),
status.getMessage());
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 74a6f10..400b2bc 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -98,6 +98,7 @@ public class SendReceiptImpl implements SendReceipt {
case ILLEGAL_MESSAGE_GROUP:
case ILLEGAL_MESSAGE_PROPERTY_KEY:
case ILLEGAL_MESSAGE_ID:
+ case ILLEGAL_DELIVERY_TIME:
case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE:
case MESSAGE_CORRUPTED:
case CLIENT_ID_REQUIRED:
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index 112fbb7..b53f20d 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -32,7 +32,7 @@ import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
public class MessageBuilderImpl implements MessageBuilder {
- public static final Pattern TOPIC_PATTERN =
Pattern.compile("^[%|a-zA-Z0-9._-]{1,127}$");
+ public static final Pattern TOPIC_PATTERN =
Pattern.compile("^[%a-zA-Z0-9_-]+$");
private static final int MESSAGE_BODY_LENGTH_THRESHOLD = 1024 * 1024 * 4;
private String topic = null;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
similarity index 94%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index a70f0d7..74a442a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -31,9 +31,9 @@ import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageMeter {
- static MessageMeter DISABLED = new MessageMeter();
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeter.class);
+public class ClientMeter {
+ static ClientMeter DISABLED = new ClientMeter();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeter.class);
private final boolean enabled;
private final Meter meter;
@@ -41,7 +41,7 @@ public class MessageMeter {
private final SdkMeterProvider provider;
private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap;
- public MessageMeter(Meter meter, Endpoints endpoints, SdkMeterProvider
provider) {
+ public ClientMeter(Meter meter, Endpoints endpoints, SdkMeterProvider
provider) {
this.enabled = true;
this.meter = checkNotNull(meter, "meter should not be null");
this.endpoints = checkNotNull(endpoints, "endpoints should not be
null");
@@ -49,7 +49,7 @@ public class MessageMeter {
this.histogramMap = new ConcurrentHashMap<>();
}
- private MessageMeter() {
+ private ClientMeter() {
this.enabled = false;
this.meter = null;
this.endpoints = null;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
similarity index 92%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
index 6a1b548..9451028 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
@@ -46,23 +46,21 @@ import
org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageMeterProvider {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterProvider.class);
+public class ClientMeterProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeterProvider.class);
private static final Duration METRIC_EXPORTER_RPC_TIMEOUT =
Duration.ofSeconds(3);
- private static final Duration METRIC_READER_INTERVAL =
Duration.ofSeconds(1);
+ private static final Duration METRIC_READER_INTERVAL =
Duration.ofMinutes(1);
private static final String METRIC_INSTRUMENTATION_NAME =
"org.apache.rocketmq.message";
private final ClientImpl client;
-
- private volatile MessageMeter messageMeter;
-
+ private volatile ClientMeter clientMeter;
private volatile MessageCacheObserver messageCacheObserver;
- public MessageMeterProvider(ClientImpl client) {
+ public ClientMeterProvider(ClientImpl client) {
this.client = client;
- this.client.registerMessageInterceptor(new
MetricMessageInterceptor(this));
- this.messageMeter = MessageMeter.DISABLED;
+ this.client.registerMessageInterceptor(new
MessageMeterInterceptor(this));
+ this.clientMeter = ClientMeter.DISABLED;
this.messageCacheObserver = null;
}
@@ -71,20 +69,20 @@ public class MessageMeterProvider {
}
Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
- return messageMeter.getHistogramByName(metricName);
+ return clientMeter.getHistogramByName(metricName);
}
public synchronized void reset(Metric metric) {
final String clientId = client.getClientId();
try {
- if (messageMeter.satisfy(metric)) {
+ if (clientMeter.satisfy(metric)) {
LOGGER.debug("Metric settings is satisfied by the current
message meter, clientId={}", clientId);
return;
}
if (!metric.isOn()) {
LOGGER.debug("Metric is off, clientId={}", clientId);
- messageMeter.shutdown();
- messageMeter = MessageMeter.DISABLED;
+ clientMeter.shutdown();
+ clientMeter = ClientMeter.DISABLED;
return;
}
final Endpoints endpoints = metric.getEndpoints();
@@ -135,9 +133,9 @@ public class MessageMeterProvider {
Meter meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
// Reset message meter.
- MessageMeter existedMessageMeter = messageMeter;
- messageMeter = new MessageMeter(meter, endpoints, provider);
- existedMessageMeter.shutdown();
+ ClientMeter existedClientMeter = clientMeter;
+ clientMeter = new ClientMeter(meter, endpoints, provider);
+ existedClientMeter.shutdown();
LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints,
clientId);
if (!(client instanceof PushConsumer)) {
@@ -173,7 +171,7 @@ public class MessageMeterProvider {
}
public boolean isEnabled() {
- return messageMeter.isEnabled();
+ return clientMeter.isEnabled();
}
public ClientImpl getClient() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
similarity index 87%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index a798455..d9bd316 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -34,19 +34,19 @@ import
org.apache.rocketmq.client.java.message.MessageCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MetricMessageInterceptor implements MessageInterceptor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MetricMessageInterceptor.class);
+public class MessageMeterInterceptor implements MessageInterceptor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterInterceptor.class);
- private final MessageMeterProvider messageMeterProvider;
+ private final ClientMeterProvider clientMeterProvider;
- public MetricMessageInterceptor(MessageMeterProvider messageMeterProvider)
{
- this.messageMeterProvider = messageMeterProvider;
+ public MessageMeterInterceptor(ClientMeterProvider clientMeterProvider) {
+ this.clientMeterProvider = clientMeterProvider;
}
private void doAfterSendMessage(List<MessageCommon> messageCommons,
Duration duration,
MessageHookPointsStatus status) {
final Optional<DoubleHistogram> optionalHistogram =
-
messageMeterProvider.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
+
clientMeterProvider.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
if (!optionalHistogram.isPresent()) {
return;
}
@@ -55,7 +55,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
InvocationStatus invocationStatus =
MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
InvocationStatus.FAILURE;
Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
- .put(MetricLabels.CLIENT_ID,
messageMeterProvider.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID,
clientMeterProvider.getClient().getClientId())
.put(MetricLabels.INVOCATION_STATUS,
invocationStatus.getName()).build();
histogram.record(duration.toMillis(), attributes);
}
@@ -65,7 +65,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
if (messageCommons.isEmpty()) {
return;
}
- final ClientImpl client = messageMeterProvider.getClient();
+ final ClientImpl client = clientMeterProvider.getClient();
String consumerGroup = null;
if (client instanceof PushConsumer) {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -85,7 +85,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
final Timestamp deliveryTimestampFromRemote =
optionalDeliveryTimestampFromRemote.get();
final long latency = System.currentTimeMillis() -
Timestamps.toMillis(deliveryTimestampFromRemote);
final Optional<DoubleHistogram> optionalHistogram =
-
messageMeterProvider.getHistogramByName(MetricName.DELIVERY_LATENCY);
+
clientMeterProvider.getHistogramByName(MetricName.DELIVERY_LATENCY);
if (!optionalHistogram.isPresent()) {
return;
}
@@ -97,7 +97,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
}
private void doBeforeConsumeMessage(List<MessageCommon> messageCommons) {
- final ClientImpl client = messageMeterProvider.getClient();
+ final ClientImpl client = clientMeterProvider.getClient();
String consumerGroup = null;
if (client instanceof PushConsumer) {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -116,7 +116,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
.put(MetricLabels.CLIENT_ID, client.getClientId()).build();
final Optional<DoubleHistogram> optionalHistogram =
- messageMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
+ clientMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
if (!optionalHistogram.isPresent()) {
return;
}
@@ -126,7 +126,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
private void doAfterProcessMessage(List<MessageCommon> messageCommons,
Duration duration,
MessageHookPointsStatus status) {
- final ClientImpl client = messageMeterProvider.getClient();
+ final ClientImpl client = clientMeterProvider.getClient();
if (!(client instanceof PushConsumer)) {
// Should never reach here.
LOGGER.error("[Bug] current client is not push consumer,
clientId={}", client.getClientId());
@@ -138,11 +138,11 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
InvocationStatus.FAILURE;
Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP,
pushConsumer.getConsumerGroup())
- .put(MetricLabels.CLIENT_ID,
messageMeterProvider.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID,
clientMeterProvider.getClient().getClientId())
.put(MetricLabels.INVOCATION_STATUS,
invocationStatus.getName())
.build();
final Optional<DoubleHistogram> optionalHistogram =
-
messageMeterProvider.getHistogramByName(MetricName.PROCESS_TIME);
+
clientMeterProvider.getHistogramByName(MetricName.PROCESS_TIME);
if (!optionalHistogram.isPresent()) {
return;
}
@@ -153,7 +153,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
@Override
public void doBefore(MessageHookPoints messageHookPoints,
List<MessageCommon> messageCommons) {
- if (!messageMeterProvider.isEnabled()) {
+ if (!clientMeterProvider.isEnabled()) {
return;
}
if (MessageHookPoints.CONSUME.equals(messageHookPoints)) {
@@ -164,7 +164,7 @@ public class MetricMessageInterceptor implements
MessageInterceptor {
@Override
public void doAfter(MessageHookPoints messageHookPoints,
List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- if (!messageMeterProvider.isEnabled()) {
+ if (!clientMeterProvider.isEnabled()) {
return;
}
switch (messageHookPoints) {
diff --git a/java/client/src/main/resources/logback.xml
b/java/client/src/main/resources/logback.xml
index 5e85f1e..5e5aabe 100644
--- a/java/client/src/main/resources/logback.xml
+++ b/java/client/src/main/resources/logback.xml
@@ -33,6 +33,5 @@
<appender-ref ref="DefaultAppender" additivity="false"/>
</root>
<!-- ref: https://github.com/grpc/grpc-java/issues/3033 -->
- <logger name="io.grpc" level="error"/>
- <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/>
+ <logger name="io.grpc" level="${rocketmq.grpc.log.level:-error}"/>
</configuration>
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
index cb37b6b..b1251fb 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.impl.consumer;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -38,13 +37,6 @@ public class PushConsumerBuilderImplTest extends TestBase {
builder.setConsumerGroup(null);
}
- @Test(expected = IllegalArgumentException.class)
- public void testSetConsumerGroupWithTooLong() {
- final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
- String tooLongConsumerGroup = StringUtils.repeat("a", 256);
- builder.setConsumerGroup(tooLongConsumerGroup);
- }
-
@Test(expected = NullPointerException.class)
public void testSetMessageListenerWithNull() {
final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
index 31609c4..f3479e8 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.impl.consumer;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.tool.TestBase;
@@ -37,13 +36,6 @@ public class SimpleConsumerBuilderTest extends TestBase {
builder.setConsumerGroup(null);
}
- @Test(expected = IllegalArgumentException.class)
- public void testSetConsumerGroupWithTooLong() {
- final SimpleConsumerBuilderImpl builder = new
SimpleConsumerBuilderImpl();
- String tooLongConsumerGroup = StringUtils.repeat("a", 256);
- builder.setConsumerGroup(tooLongConsumerGroup);
- }
-
@Test(expected = IllegalArgumentException.class)
public void testBuildWithoutExpressions() throws ClientException {
final SimpleConsumerBuilderImpl builder = new
SimpleConsumerBuilderImpl();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
index 6224628..6f98f3f 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.impl.producer;
-import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@@ -38,13 +37,6 @@ public class ProducerBuilderImplTest {
builder.setTopics(null);
}
- @Test(expected = IllegalArgumentException.class)
- public void testSetTopicWithTooLong() {
- final ProducerBuilderImpl builder = new ProducerBuilderImpl();
- String tooLongTopic = StringUtils.repeat("a", 128);
- builder.setTopics(tooLongTopic);
- }
-
@Test(expected = IllegalArgumentException.class)
public void testSetNegativeMaxAttempts() {
final ProducerBuilderImpl builder = new ProducerBuilderImpl();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index be03d1d..ea31921 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -40,12 +40,6 @@ public class MessageImplTest extends TestBase {
provider.newMessageBuilder().setTopic(null);
}
- @Test(expected = IllegalArgumentException.class)
- public void testTopicSetterWithLengthEquals128() {
- String topicWithLengthEquals128 = new String(new
char[128]).replace("\0", "a");
- provider.newMessageBuilder().setTopic(topicWithLengthEquals128);
- }
-
@Test(expected = IllegalArgumentException.class)
public void testTagSetterWithVerticalBar() {
provider.newMessageBuilder().setTag("|");