This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 1191f5be Rename LOGGER to log
1191f5be is described below
commit 1191f5bed7286ece27fe7daca6ba7804aec56757
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Nov 14 17:22:14 2022 +0800
Rename LOGGER to log
---
.../client/java/example/AsyncProducerExample.java | 6 +-
.../java/example/AsyncSimpleConsumerExample.java | 10 +--
.../java/example/ProducerDelayMessageExample.java | 6 +-
.../java/example/ProducerFifoMessageExample.java | 6 +-
.../java/example/ProducerNormalMessageExample.java | 6 +-
.../example/ProducerTransactionMessageExample.java | 8 +--
.../client/java/example/PushConsumerExample.java | 4 +-
.../client/java/example/SimpleConsumerExample.java | 8 +--
.../client/java/exception/StatusChecker.java | 4 +-
.../java/hook/CompositedMessageInterceptor.java | 6 +-
.../rocketmq/client/java/impl/ClientImpl.java | 78 +++++++++++-----------
.../client/java/impl/ClientManagerImpl.java | 36 +++++-----
.../client/java/impl/ClientSessionImpl.java | 36 +++++-----
.../client/java/impl/consumer/ConsumeService.java | 4 +-
.../client/java/impl/consumer/ConsumeTask.java | 4 +-
.../client/java/impl/consumer/ConsumerImpl.java | 10 +--
.../java/impl/consumer/FifoConsumeService.java | 4 +-
.../java/impl/consumer/ProcessQueueImpl.java | 68 +++++++++----------
.../java/impl/consumer/PushConsumerImpl.java | 48 ++++++-------
.../impl/consumer/PushSubscriptionSettings.java | 6 +-
.../java/impl/consumer/SimpleConsumerImpl.java | 22 +++---
.../impl/consumer/SimpleSubscriptionSettings.java | 6 +-
.../java/impl/consumer/StandardConsumeService.java | 6 +-
.../client/java/impl/producer/ProducerImpl.java | 42 ++++++------
.../java/impl/producer/PublishingSettings.java | 4 +-
.../client/java/message/MessageViewImpl.java | 12 ++--
.../rocketmq/client/java/metrics/ClientMeter.java | 8 +--
.../client/java/metrics/ClientMeterManager.java | 10 +--
.../java/metrics/MessageMeterInterceptor.java | 10 +--
.../rocketmq/client/java/rpc/AuthInterceptor.java | 4 +-
.../client/java/rpc/LoggingInterceptor.java | 10 +--
31 files changed, 246 insertions(+), 246 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 10354445..832be4d5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncProducerExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncProducerExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(AsyncProducerExample.class);
private AsyncProducerExample() {
}
@@ -74,9 +74,9 @@ public class AsyncProducerExample {
final CompletableFuture<SendReceipt> future =
producer.sendAsync(message);
future.whenComplete((sendReceipt, throwable) -> {
if (null == throwable) {
- LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ log.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
} else {
- LOGGER.error("Failed to send message", throwable);
+ log.error("Failed to send message", throwable);
}
});
// Block to avoid exist of background threads.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 2b660589..594f6367 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncSimpleConsumerExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncSimpleConsumerExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(AsyncSimpleConsumerExample.class);
private AsyncSimpleConsumerExample() {
}
@@ -77,21 +77,21 @@ public class AsyncSimpleConsumerExample {
Duration invisibleDuration = Duration.ofSeconds(15);
final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum, invisibleDuration);
future0.thenAccept(messages -> {
- LOGGER.info("Received {} message(s)", messages.size());
+ log.info("Received {} message(s)", messages.size());
// Using messageView as key rather than message id because message
id may be duplicated.
final Map<MessageView, CompletableFuture<Void>> map =
messages.stream().collect(Collectors.toMap(message -> message,
consumer::ackAsync));
for (Map.Entry<MessageView, CompletableFuture<Void>> entry :
map.entrySet()) {
final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
- future.thenAccept(v -> LOGGER.info("Message is acknowledged
successfully, messageId={}", messageId))
+ future.thenAccept(v -> log.info("Message is acknowledged
successfully, messageId={}", messageId))
.exceptionally(throwable -> {
- LOGGER.error("Message is failed to be acknowledged,
messageId={}", messageId);
+ log.error("Message is failed to be acknowledged,
messageId={}", messageId);
return null;
});
}
}).exceptionally(t -> {
- LOGGER.error("Failed to receive message from remote", t);
+ log.error("Failed to receive message from remote", t);
return null;
});
// Block to avoid exist of background threads.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
index 9d778b61..5372a7e8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDelayMessageExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerDelayMessageExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProducerDelayMessageExample.class);
private ProducerDelayMessageExample() {
}
@@ -76,9 +76,9 @@ public class ProducerDelayMessageExample {
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
- LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ log.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
} catch (Throwable t) {
- LOGGER.error("Failed to send message", t);
+ log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
index 6af757f9..ca0ce397 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerFifoMessageExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerFifoMessageExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProducerFifoMessageExample.class);
private ProducerFifoMessageExample() {
}
@@ -74,9 +74,9 @@ public class ProducerFifoMessageExample {
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
- LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ log.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
} catch (Throwable t) {
- LOGGER.error("Failed to send message", t);
+ log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
index 1b406f5f..f19ef734 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerNormalMessageExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerNormalMessageExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProducerNormalMessageExample.class);
private ProducerNormalMessageExample() {
}
@@ -72,9 +72,9 @@ public class ProducerNormalMessageExample {
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
- LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ log.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
} catch (Throwable t) {
- LOGGER.error("Failed to send message", t);
+ log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
index f3b7a951..a3f90f2f 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerTransactionMessageExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerTransactionMessageExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProducerTransactionMessageExample.class);
private ProducerTransactionMessageExample() {
}
@@ -55,7 +55,7 @@ public class ProducerTransactionMessageExample {
.build();
String topic = "yourTransactionTopic";
TransactionChecker checker = messageView -> {
- LOGGER.info("Receive transactional message check, message={}",
messageView);
+ log.info("Receive transactional message check, message={}",
messageView);
// Return the transaction resolution according to your business
logic.
return TransactionResolution.COMMIT;
};
@@ -82,9 +82,9 @@ public class ProducerTransactionMessageExample {
.build();
try {
final SendReceipt sendReceipt = producer.send(message,
transaction);
- LOGGER.info("Send transaction message successfully, messageId={}",
sendReceipt.getMessageId());
+ log.info("Send transaction message successfully, messageId={}",
sendReceipt.getMessageId());
} catch (Throwable t) {
- LOGGER.error("Failed to send message", t);
+ log.error("Failed to send message", t);
return;
}
// Commit the transaction.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
index 377c4ee1..04967a75 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PushConsumerExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
@@ -63,7 +63,7 @@ public class PushConsumerExample {
.setSubscriptionExpressions(Collections.singletonMap(topic,
filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
- LOGGER.info("Consume message={}", messageView);
+ log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index a42d4c34..c3f06a1a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumerExample {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleConsumerExample.class);
+ private static final Logger log =
LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
@@ -73,14 +73,14 @@ public class SimpleConsumerExample {
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
- LOGGER.info("Received {} message(s)", messages.size());
+ log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
- LOGGER.info("Message is acknowledged successfully,
messageId={}", messageId);
+ log.info("Message is acknowledged successfully, messageId={}",
messageId);
} catch (Throwable t) {
- LOGGER.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
+ log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
}
}
// Close the simple consumer when you don't need it anymore.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
index 66298270..76f5a225 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StatusChecker {
- private static final Logger LOGGER =
LoggerFactory.getLogger(StatusChecker.class);
+ private static final Logger log =
LoggerFactory.getLogger(StatusChecker.class);
private StatusChecker() {
}
@@ -97,7 +97,7 @@ public class StatusChecker {
case VERIFY_FIFO_MESSAGE_UNSUPPORTED:
throw new UnsupportedException(codeNumber, requestId,
statusMessage);
default:
- LOGGER.warn("Unrecognized status code={}, requestId={},
statusMessage={}, clientVersion={}",
+ log.warn("Unrecognized status code={}, requestId={},
statusMessage={}, clientVersion={}",
codeNumber, requestId, statusMessage,
MetadataUtils.getVersion());
throw new UnsupportedException(codeNumber, requestId,
statusMessage);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
index c0ddaef3..16562c86 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
public class CompositedMessageInterceptor implements MessageInterceptor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageInterceptor.class);
+ private static final Logger log =
LoggerFactory.getLogger(MessageInterceptor.class);
private static final AttributeKey<Map<Integer, Map<AttributeKey,
Attribute>>> INTERCEPTOR_ATTRIBUTES_KEY =
AttributeKey.create("composited_interceptor_attributes");
private final List<MessageInterceptor> interceptors;
@@ -46,7 +46,7 @@ public class CompositedMessageInterceptor implements
MessageInterceptor {
try {
interceptor.doBefore(context, messages);
} catch (Throwable t) {
- LOGGER.error("Exception raised while handing messages", t);
+ log.error("Exception raised while handing messages", t);
}
final Map<AttributeKey, Attribute> attributes =
context.getAttributes();
attributeMap.put(index, attributes);
@@ -68,7 +68,7 @@ public class CompositedMessageInterceptor implements
MessageInterceptor {
try {
interceptor.doAfter(context, messages);
} catch (Throwable t) {
- LOGGER.error("Exception raised while handing messages", t);
+ log.error("Exception raised while handing messages", t);
}
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 95eef3cc..aabdd9bb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -94,7 +94,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
public abstract class ClientImpl extends AbstractIdleService implements
Client, ClientSessionHandler,
MessageInterceptor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ClientImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(ClientImpl.class);
/**
* The telemetry timeout should not be too long, otherwise
* <a href="https://github.com/grpc/grpc-java/issues/7351">this issue</a>
may be triggered in JDK8 + macOS.
@@ -171,7 +171,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
new ThreadFactoryImpl("CommandExecutor", clientIdIndex));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- LOGGER.info("JVM shutdown hook is invoked, clientId={}, state={}",
clientId, ClientImpl.this.state());
+ log.info("JVM shutdown hook is invoked, clientId={}, state={}",
clientId, ClientImpl.this.state());
ClientImpl.this.stopAsync().awaitTerminated();
}));
}
@@ -182,16 +182,16 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
@Override
protected void startUp() throws Exception {
- LOGGER.info("Begin to start the rocketmq client, clientId={}",
clientId);
+ log.info("Begin to start the rocketmq client, clientId={}", clientId);
this.clientManager.startAsync().awaitRunning();
// Fetch topic route from remote.
- LOGGER.info("Begin to fetch topic(s) route data from remote during
client startup, clientId={}, topics={}",
+ log.info("Begin to fetch topic(s) route data from remote during client
startup, clientId={}, topics={}",
clientId, topics);
for (String topic : topics) {
final ListenableFuture<TopicRouteData> future =
fetchTopicRoute(topic);
future.get();
}
- LOGGER.info("Fetch topic route data from remote successfully during
startup, clientId={}, topics={}",
+ log.info("Fetch topic route data from remote successfully during
startup, clientId={}, topics={}",
clientId, topics);
// Update route cache periodically.
final ScheduledExecutorService scheduler =
clientManager.getScheduler();
@@ -199,10 +199,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
try {
updateRouteCache();
} catch (Throwable t) {
- LOGGER.error("Exception raised while updating topic route
cache, clientId={}", clientId, t);
+ log.error("Exception raised while updating topic route cache,
clientId={}", clientId, t);
}
}, 10, 30, TimeUnit.SECONDS);
- LOGGER.info("The rocketmq client starts successfully, clientId={}",
clientId);
+ log.info("The rocketmq client starts successfully, clientId={}",
clientId);
}
/**
@@ -210,27 +210,27 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
@Override
protected void shutDown() throws InterruptedException {
- LOGGER.info("Begin to shutdown the rocketmq client, clientId={}",
clientId);
+ log.info("Begin to shutdown the rocketmq client, clientId={}",
clientId);
notifyClientTermination();
if (null != this.updateRouteCacheFuture) {
updateRouteCacheFuture.cancel(false);
}
telemetryCommandExecutor.shutdown();
if (!ExecutorServices.awaitTerminated(telemetryCommandExecutor)) {
- LOGGER.error("[Bug] Timeout to shutdown the telemetry command
executor, clientId={}", clientId);
+ log.error("[Bug] Timeout to shutdown the telemetry command
executor, clientId={}", clientId);
} else {
- LOGGER.info("Shutdown the telemetry command executor successfully,
clientId={}", clientId);
+ log.info("Shutdown the telemetry command executor successfully,
clientId={}", clientId);
}
- LOGGER.info("Begin to release all telemetry sessions, clientId={}",
clientId);
+ log.info("Begin to release all telemetry sessions, clientId={}",
clientId);
releaseClientSessions();
- LOGGER.info("Release all telemetry sessions successfully,
clientId={}", clientId);
+ log.info("Release all telemetry sessions successfully, clientId={}",
clientId);
clientManager.stopAsync().awaitTerminated();
clientCallbackExecutor.shutdown();
if (!ExecutorServices.awaitTerminated(clientCallbackExecutor)) {
- LOGGER.error("[Bug] Timeout to shutdown the client callback
executor, clientId={}", clientId);
+ log.error("[Bug] Timeout to shutdown the client callback executor,
clientId={}", clientId);
}
clientMeterManager.shutdown();
- LOGGER.info("Shutdown the rocketmq client successfully, clientId={}",
clientId);
+ log.info("Shutdown the rocketmq client successfully, clientId={}",
clientId);
}
@Override
@@ -239,7 +239,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
compositedMessageInterceptor.doBefore(context, generalMessages);
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while handling messages,
clientId={}", clientId, t);
+ log.error("[Bug] Exception raised while handling messages,
clientId={}", clientId, t);
}
}
@@ -249,7 +249,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
compositedMessageInterceptor.doAfter(context, generalMessages);
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while handling messages,
clientId={}", clientId, t);
+ log.error("[Bug] Exception raised while handling messages,
clientId={}", clientId, t);
}
}
@@ -298,14 +298,14 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
.build();
telemetry(endpoints, telemetryCommand);
} catch (Throwable t) {
- LOGGER.error("Failed to send thread stack trace to remote,
endpoints={}, nonce={}, clientId={}",
+ log.error("Failed to send thread stack trace to remote,
endpoints={}, nonce={}, clientId={}",
endpoints, nonce, clientId, t);
}
};
try {
telemetryCommandExecutor.submit(task);
} catch (Throwable t) {
- LOGGER.error("[Bug] Exception raised while submitting task to
print thread stack trace, endpoints={}, "
+ log.error("[Bug] Exception raised while submitting task to print
thread stack trace, endpoints={}, "
+ "nonce={}, clientId={}", endpoints, nonce, clientId, t);
}
}
@@ -337,7 +337,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
try {
telemetry(endpoints, command);
} catch (Throwable t) {
- LOGGER.error("Failed to telemeter settings, clientId={},
endpoints={}", clientId, endpoints, t);
+ log.error("Failed to telemeter settings, clientId={},
endpoints={}", clientId, endpoints, t);
}
}
}
@@ -347,7 +347,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
final ClientSessionImpl clientSession =
getClientSession(endpoints);
clientSession.write(command);
} catch (Throwable t) {
- LOGGER.error("Failed to fire write telemetry command, clientId={},
endpoints={}", clientId, endpoints, t);
+ log.error("Failed to fire write telemetry command, clientId={},
endpoints={}", clientId, endpoints, t);
}
}
@@ -414,7 +414,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
@Override
public void onVerifyMessageCommand(Endpoints endpoints,
VerifyMessageCommand command) {
- LOGGER.warn("Ignore verify message command from remote, which is not
expected, clientId={}, command={}",
+ log.warn("Ignore verify message command from remote, which is not
expected, clientId={}, command={}",
clientId, command);
final String nonce = command.getNonce();
final Status status =
Status.newBuilder().setCode(Code.NOT_IMPLEMENTED).build();
@@ -426,7 +426,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
try {
telemetry(endpoints, telemetryCommand);
} catch (Throwable t) {
- LOGGER.warn("Failed to send message verification result,
clientId={}", clientId, t);
+ log.warn("Failed to send message verification result,
clientId={}", clientId, t);
}
}
@@ -438,12 +438,12 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
@Override
public void onRecoverOrphanedTransactionCommand(Endpoints endpoints,
RecoverOrphanedTransactionCommand command) {
- LOGGER.warn("Ignore orphaned transaction recovery command from remote,
which is not expected, clientId={}, "
+ log.warn("Ignore orphaned transaction recovery command from remote,
which is not expected, clientId={}, "
+ "command={}", clientId, command);
}
private void updateRouteCache() {
- LOGGER.info("Start to update route cache for a new round,
clientId={}", clientId);
+ log.info("Start to update route cache for a new round, clientId={}",
clientId);
topicRouteCache.keySet().forEach(topic -> {
final ListenableFuture<TopicRouteData> future =
fetchTopicRoute(topic);
Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@@ -453,7 +453,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Failed to fetch topic route for update
cache, topic={}, clientId={}", topic,
+ log.error("Failed to fetch topic route for update cache,
topic={}, clientId={}", topic,
clientId, t);
}
}, MoreExecutors.directExecutor());
@@ -469,7 +469,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* Notify remote that current client is prepared to be terminated.
*/
private void notifyClientTermination() {
- LOGGER.info("Notify remote that client is terminated, clientId={}",
clientId);
+ log.info("Notify remote that client is terminated, clientId={}",
clientId);
final Set<Endpoints> routeEndpointsSet = getTotalRouteEndpoints();
final NotifyClientTerminationRequest notifyClientTerminationRequest =
wrapNotifyClientTerminationRequest();
try {
@@ -479,7 +479,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while notifying client's
termination, clientId={}", clientId, t);
+ log.error("[Bug] Exception raised while notifying client's
termination, clientId={}", clientId, t);
}
}
@@ -536,26 +536,26 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.OK != code) {
- LOGGER.warn("Failed to send heartbeat, code={}, status
message=[{}], endpoints={}, clientId={}",
+ log.warn("Failed to send heartbeat, code={}, status
message=[{}], endpoints={}, clientId={}",
code, status.getMessage(), endpoints, clientId);
return;
}
- LOGGER.info("Send heartbeat successfully, endpoints={},
clientId={}", endpoints, clientId);
+ log.info("Send heartbeat successfully, endpoints={},
clientId={}", endpoints, clientId);
final boolean removed = isolated.remove(endpoints);
if (removed) {
- LOGGER.info("Rejoin endpoints which is isolated
before, clientId={}, endpoints={}", clientId,
+ log.info("Rejoin endpoints which is isolated before,
clientId={}, endpoints={}", clientId,
endpoints);
}
}
@Override
public void onFailure(Throwable t) {
- LOGGER.warn("Failed to send heartbeat, endpoints={},
clientId={}", endpoints, clientId, t);
+ log.warn("Failed to send heartbeat, endpoints={},
clientId={}", endpoints, clientId, t);
}
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while preparing heartbeat,
endpoints={}, clientId={}", endpoints,
+ log.error("[Bug] Exception raised while preparing heartbeat,
endpoints={}, clientId={}", endpoints,
clientId, t);
}
}
@@ -581,13 +581,13 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
public void onSuccess(TopicRouteData topicRouteData) {
- LOGGER.info("Fetch topic route successfully, clientId={},
topic={}, topicRouteData={}", clientId,
+ log.info("Fetch topic route successfully, clientId={},
topic={}, topicRouteData={}", clientId,
topic, topicRouteData);
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Failed to fetch topic route, clientId={},
topic={}", clientId, topic, t);
+ log.error("Failed to fetch topic route, clientId={},
topic={}", clientId, topic, t);
}
}, MoreExecutors.directExecutor());
return future;
@@ -654,18 +654,18 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
- LOGGER.error("[Bug] in-flight route futures was empty,
topic={}, clientId={}", topic,
+ log.error("[Bug] in-flight route futures was empty,
topic={}, clientId={}", topic,
clientId);
return;
}
- LOGGER.debug("Fetch topic route successfully, topic={},
in-flight route future "
+ log.debug("Fetch topic route successfully, topic={},
in-flight route future "
+ "size={}, clientId={}", topic, newFutureSet.size(),
clientId);
for (SettableFuture<TopicRouteData> newFuture :
newFutureSet) {
newFuture.set(topicRouteData);
}
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while update route
data, topic={}, clientId={}", topic,
+ log.error("[Bug] Exception raised while update route data,
topic={}, clientId={}", topic,
clientId, t);
} finally {
inflightRouteFutureLock.unlock();
@@ -680,10 +680,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
- LOGGER.error("[Bug] in-flight route futures was empty,
topic={}, clientId={}", topic, clientId);
+ log.error("[Bug] in-flight route futures was empty,
topic={}, clientId={}", topic, clientId);
return;
}
- LOGGER.debug("Failed to fetch topic route, topic={},
in-flight route future " +
+ log.debug("Failed to fetch topic route, topic={},
in-flight route future " +
"size={}, clientId={}", topic, newFutureSet.size(),
clientId, t);
for (SettableFuture<TopicRouteData> future : newFutureSet)
{
future.setException(t);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index d90be989..4e41b7dd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -89,7 +89,7 @@ public class ClientManagerImpl extends ClientManager {
public static final Duration SYNC_SETTINGS_DELAY = Duration.ofSeconds(1);
public static final Duration SYNC_SETTINGS_PERIOD = Duration.ofMinutes(5);
- private static final Logger LOGGER =
LoggerFactory.getLogger(ClientManagerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(ClientManagerImpl.class);
private final Client client;
@@ -144,7 +144,7 @@ public class ClientManagerImpl extends ClientManager {
if (idleDuration.compareTo(RPC_CLIENT_MAX_IDLE_DURATION) > 0) {
it.remove();
rpcClient.shutdown();
- LOGGER.info("Rpc client has been idle for a long time,
endpoints={}, idleDuration={}, " +
+ log.info("Rpc client has been idle for a long time,
endpoints={}, idleDuration={}, " +
"rpcClientMaxIdleDuration={}, clientId={}",
endpoints, idleDuration,
RPC_CLIENT_MAX_IDLE_DURATION, client.getClientId());
}
@@ -183,7 +183,7 @@ public class ClientManagerImpl extends ClientManager {
try {
rpcClient = new RpcClientImpl(endpoints);
} catch (SSLException e) {
- LOGGER.error("Failed to get RPC client, endpoints={},
clientId={}", endpoints, client.getClientId(), e);
+ log.error("Failed to get RPC client, endpoints={},
clientId={}", endpoints, client.getClientId(), e);
throw new ClientException("Failed to generate RPC client", e);
}
rpcClientTable.put(endpoints, rpcClient);
@@ -365,13 +365,13 @@ public class ClientManagerImpl extends ClientManager {
@Override
protected void startUp() {
final ClientId clientId = client.getClientId();
- LOGGER.info("Begin to start the client manager, clientId={}",
clientId);
+ log.info("Begin to start the client manager, clientId={}", clientId);
scheduler.scheduleWithFixedDelay(
() -> {
try {
clearIdleRpcClients();
} catch (Throwable t) {
- LOGGER.error("Exception raised during the clearing of idle
rpc clients, clientId={}", clientId, t);
+ log.error("Exception raised during the clearing of idle
rpc clients, clientId={}", clientId, t);
}
},
RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY.toNanos(),
@@ -384,7 +384,7 @@ public class ClientManagerImpl extends ClientManager {
try {
client.doHeartbeat();
} catch (Throwable t) {
- LOGGER.error("Exception raised during heartbeat,
clientId={}", clientId, t);
+ log.error("Exception raised during heartbeat,
clientId={}", clientId, t);
}
},
HEART_BEAT_INITIAL_DELAY.toNanos(),
@@ -395,12 +395,12 @@ public class ClientManagerImpl extends ClientManager {
scheduler.scheduleWithFixedDelay(
() -> {
try {
- LOGGER.info("Start to log statistics, clientVersion={},
clientWrapperVersion={}, "
+ log.info("Start to log statistics, clientVersion={},
clientWrapperVersion={}, "
+ "clientEndpoints={}, clientId={}",
MetadataUtils.getVersion(),
MetadataUtils.getWrapperVersion(),
client.getEndpoints(), clientId);
client.doStats();
} catch (Throwable t) {
- LOGGER.error("Exception raised during statistics logging,
clientId={}", clientId, t);
+ log.error("Exception raised during statistics logging,
clientId={}", clientId, t);
}
},
LOG_STATS_INITIAL_DELAY.toNanos(),
@@ -413,26 +413,26 @@ public class ClientManagerImpl extends ClientManager {
try {
client.syncSettings();
} catch (Throwable t) {
- LOGGER.error("Exception raised during the setting
synchronization, clientId={}", clientId, t);
+ log.error("Exception raised during the setting
synchronization, clientId={}", clientId, t);
}
},
SYNC_SETTINGS_DELAY.toNanos(),
SYNC_SETTINGS_PERIOD.toNanos(),
TimeUnit.NANOSECONDS
);
- LOGGER.info("The client manager starts successfully, clientId={}",
clientId);
+ log.info("The client manager starts successfully, clientId={}",
clientId);
}
@Override
protected void shutDown() throws IOException {
final ClientId clientId = client.getClientId();
- LOGGER.info("Begin to shutdown the client manager, clientId={}",
clientId);
+ log.info("Begin to shutdown the client manager, clientId={}",
clientId);
scheduler.shutdown();
try {
if (!ExecutorServices.awaitTerminated(scheduler)) {
- LOGGER.error("[Bug] Timeout to shutdown the client scheduler,
clientId={}", clientId);
+ log.error("[Bug] Timeout to shutdown the client scheduler,
clientId={}", clientId);
} else {
- LOGGER.info("Shutdown the client scheduler successfully,
clientId={}", clientId);
+ log.info("Shutdown the client scheduler successfully,
clientId={}", clientId);
}
rpcClientTableLock.writeLock().lock();
try {
@@ -446,18 +446,18 @@ public class ClientManagerImpl extends ClientManager {
} finally {
rpcClientTableLock.writeLock().unlock();
}
- LOGGER.info("Shutdown all rpc client(s) successfully,
clientId={}", clientId);
+ log.info("Shutdown all rpc client(s) successfully, clientId={}",
clientId);
asyncWorker.shutdown();
if (!ExecutorServices.awaitTerminated(asyncWorker)) {
- LOGGER.error("[Bug] Timeout to shutdown the client async
worker, clientId={}", clientId);
+ log.error("[Bug] Timeout to shutdown the client async worker,
clientId={}", clientId);
} else {
- LOGGER.info("Shutdown the client async worker successfully,
clientId={}", clientId);
+ log.info("Shutdown the client async worker successfully,
clientId={}", clientId);
}
} catch (InterruptedException e) {
- LOGGER.error("[Bug] Unexpected exception raised while shutdown
client manager, clientId={}", clientId, e);
+ log.error("[Bug] Unexpected exception raised while shutdown client
manager, clientId={}", clientId, e);
throw new IOException(e);
}
- LOGGER.info("Shutdown the client manager successfully, clientId={}",
clientId);
+ log.info("Shutdown the client manager successfully, clientId={}",
clientId);
}
@SuppressWarnings("NullableProblems")
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 0297b329..ba9a5805 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
*/
public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY =
Duration.ofSeconds(1);
- private static final Logger LOGGER =
LoggerFactory.getLogger(ClientSessionImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(ClientSessionImpl.class);
private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
private final ClientSessionHandler sessionHandler;
@@ -59,20 +59,20 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
final ClientId clientId = sessionHandler.getClientId();
try {
if (sessionHandler.isEndpointsDeprecated(endpoints)) {
- LOGGER.info("Endpoints is deprecated, no longer to renew
requestObserver, endpoints={}, clientId={}",
+ log.info("Endpoints is deprecated, no longer to renew
requestObserver, endpoints={}, clientId={}",
endpoints, clientId);
return;
}
- LOGGER.info("Try to renew requestObserver, endpoints={},
clientId={}", endpoints, clientId);
+ log.info("Try to renew requestObserver, endpoints={},
clientId={}", endpoints, clientId);
this.requestObserver = sessionHandler.telemetry(endpoints, this);
} catch (Throwable t) {
- LOGGER.error("Failed to renew requestObserver, attempt to renew
later, endpoints={}, delay={}, clientId={}",
+ log.error("Failed to renew requestObserver, attempt to renew
later, endpoints={}, delay={}, clientId={}",
endpoints, REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, clientId, t);
sessionHandler.getScheduler().schedule(this::renewRequestObserver,
REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(),
TimeUnit.NANOSECONDS);
return;
}
- LOGGER.info("Sync setting to remote after requestObserver is renewed,
endpoints={}, clientId={}", endpoints,
+ log.info("Sync setting to remote after requestObserver is renewed,
endpoints={}, clientId={}", endpoints,
clientId);
syncSettings0();
}
@@ -93,11 +93,11 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
public void release() {
final ClientId clientId = sessionHandler.getClientId();
if (null == requestObserver) {
- LOGGER.error("[Bug] request observer does not exist, no need to
release, endpoints={}, clientId={}",
+ log.error("[Bug] request observer does not exist, no need to
release, endpoints={}, clientId={}",
endpoints, clientId);
return;
}
- LOGGER.info("Begin to release client session, endpoints={},
clientId={}", endpoints, clientId);
+ log.info("Begin to release client session, endpoints={}, clientId={}",
endpoints, clientId);
try {
requestObserver.onCompleted();
} catch (Throwable ignore) {
@@ -107,7 +107,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
void write(TelemetryCommand command) {
if (null == requestObserver) {
- LOGGER.error("[Bug] Request observer does not exist, ignore
current command, endpoints={}, command={}, "
+ log.error("[Bug] Request observer does not exist, ignore current
command, endpoints={}, command={}, "
+ "clientId={}", endpoints, command,
sessionHandler.getClientId());
return;
}
@@ -121,7 +121,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
switch (command.getCommandCase()) {
case SETTINGS: {
final Settings settings = command.getSettings();
- LOGGER.info("Receive settings from remote, endpoints={},
clientId={}", endpoints, clientId);
+ log.info("Receive settings from remote, endpoints={},
clientId={}", endpoints, clientId);
sessionHandler.onSettingsCommand(endpoints, settings);
settingsSettableFuture.set(settings);
break;
@@ -129,14 +129,14 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
final RecoverOrphanedTransactionCommand
recoverOrphanedTransactionCommand =
command.getRecoverOrphanedTransactionCommand();
- LOGGER.info("Receive orphaned transaction recovery command
from remote, endpoints={}, "
+ log.info("Receive orphaned transaction recovery command
from remote, endpoints={}, "
+ "clientId={}", endpoints, clientId);
sessionHandler.onRecoverOrphanedTransactionCommand(endpoints,
recoverOrphanedTransactionCommand);
break;
}
case VERIFY_MESSAGE_COMMAND: {
final VerifyMessageCommand verifyMessageCommand =
command.getVerifyMessageCommand();
- LOGGER.info("Receive message verification command from
remote, endpoints={}, clientId={}",
+ log.info("Receive message verification command from
remote, endpoints={}, clientId={}",
endpoints, clientId);
sessionHandler.onVerifyMessageCommand(endpoints,
verifyMessageCommand);
break;
@@ -144,17 +144,17 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
case PRINT_THREAD_STACK_TRACE_COMMAND: {
final PrintThreadStackTraceCommand
printThreadStackTraceCommand =
command.getPrintThreadStackTraceCommand();
- LOGGER.info("Receive thread stack print command from
remote, endpoints={}, clientId={}",
+ log.info("Receive thread stack print command from remote,
endpoints={}, clientId={}",
endpoints, clientId);
sessionHandler.onPrintThreadStackTraceCommand(endpoints,
printThreadStackTraceCommand);
break;
}
default:
- LOGGER.warn("Receive unrecognized command from remote,
endpoints={}, command={}, clientId={}",
+ log.warn("Receive unrecognized command from remote,
endpoints={}, command={}, clientId={}",
endpoints, command, clientId);
}
} catch (Throwable t) {
- LOGGER.error("[Bug] unexpected exception raised while receiving
command from remote, command={}, "
+ log.error("[Bug] unexpected exception raised while receiving
command from remote, command={}, "
+ "clientId={}", command, clientId, t);
}
}
@@ -162,11 +162,11 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
@Override
public void onError(Throwable throwable) {
final ClientId clientId = sessionHandler.getClientId();
- LOGGER.error("Exception raised from stream response observer,
clientId={}, endpoints={}", clientId, endpoints,
+ log.error("Exception raised from stream response observer,
clientId={}, endpoints={}", clientId, endpoints,
throwable);
release();
if (!sessionHandler.isRunning()) {
- LOGGER.info("Session handler is not running, forgive to renew
request observer, clientId={}, "
+ log.info("Session handler is not running, forgive to renew request
observer, clientId={}, "
+ "endpoints={}", clientId, endpoints);
return;
}
@@ -177,10 +177,10 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
@Override
public void onCompleted() {
final ClientId clientId = sessionHandler.getClientId();
- LOGGER.info("Receive completion for stream response observer,
clientId={}, endpoints={}", clientId, endpoints);
+ log.info("Receive completion for stream response observer,
clientId={}, endpoints={}", clientId, endpoints);
release();
if (!sessionHandler.isRunning()) {
- LOGGER.info("Session handler is not running, forgive to renew
request observer, clientId={}, "
+ log.info("Session handler is not running, forgive to renew request
observer, clientId={}, "
+ "endpoints={}", clientId, endpoints);
return;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index 1e9f5856..40adb9fb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("NullableProblems")
public abstract class ConsumeService {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumeService.class);
+ private static final Logger log =
LoggerFactory.getLogger(ConsumeService.class);
protected final ClientId clientId;
private final MessageListener messageListener;
@@ -80,7 +80,7 @@ public abstract class ConsumeService {
@Override
public void onFailure(Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while submitting
scheduled consumption task, clientId={}",
+ log.error("[Bug] Exception raised while submitting
scheduled consumption task, clientId={}",
clientId, t);
}
}, MoreExecutors.directExecutor());
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index 45fd9c95..90e34c5a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumeTask implements Callable<ConsumeResult> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumeTask.class);
+ private static final Logger log =
LoggerFactory.getLogger(ConsumeTask.class);
private final ClientId clientId;
private final MessageListener messageListener;
@@ -63,7 +63,7 @@ public class ConsumeTask implements Callable<ConsumeResult> {
try {
consumeResult = messageListener.consume(messageView);
} catch (Throwable t) {
- LOGGER.error("Message listener raised an exception while consuming
messages, clientId={}", clientId, t);
+ log.error("Message listener raised an exception while consuming
messages, clientId={}", clientId, t);
// If exception was thrown during the period of message
consumption, mark it as failure.
consumeResult = ConsumeResult.FAILURE;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 4add7887..465fdb23 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
abstract class ConsumerImpl extends ClientImpl {
static final Pattern CONSUMER_GROUP_PATTERN =
Pattern.compile("^[%a-zA-Z0-9_-]+$");
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerImpl.class);
private final String consumerGroup;
ConsumerImpl(ClientConfiguration clientConfiguration, String
consumerGroup, Set<String> topics) {
@@ -103,7 +103,7 @@ abstract class ConsumerImpl extends ClientImpl {
transportDeliveryTimestamp =
Timestamps.toMillis(deliveryTimestamp);
break;
default:
- LOGGER.warn("[Bug] Not recognized content for
receive message response, mq={}, " +
+ log.warn("[Bug] Not recognized content for receive
message response, mq={}, " +
"clientId={}, response={}", mq, clientId,
response);
}
}
@@ -117,7 +117,7 @@ abstract class ConsumerImpl extends ClientImpl {
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised during message receiving,
mq={}, clientId={}", mq, clientId, t);
+ log.error("[Bug] Exception raised during message receiving, mq={},
clientId={}", mq, clientId, t);
return Futures.immediateFailedFuture(t);
}
}
@@ -196,7 +196,7 @@ abstract class ConsumerImpl extends ClientImpl {
MessageHookPointsStatus hookPointsStatus =
Code.OK.equals(code) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to change message invisible duration,
messageId={}, endpoints={}, code={}, " +
+ log.error("Failed to change message invisible duration,
messageId={}, endpoints={}, code={}, " +
"status message=[{}], clientId={}", messageId,
endpoints, code, status.getMessage(), clientId);
}
MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
@@ -209,7 +209,7 @@ abstract class ConsumerImpl extends ClientImpl {
MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
- LOGGER.error("Exception raised while changing message
invisible duration, messageId={}, endpoints={}, "
+ log.error("Exception raised while changing message invisible
duration, messageId={}, endpoints={}, "
+ "clientId={}",
messageId, endpoints, clientId, t);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index 8a6c7ca4..07fb6238 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("UnstableApiUsage")
class FifoConsumeService extends ConsumeService {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FifoConsumeService.class);
+ private static final Logger log =
LoggerFactory.getLogger(FifoConsumeService.class);
public FifoConsumeService(ClientId clientId, MessageListener
messageListener,
ThreadPoolExecutor consumptionExecutor, MessageInterceptor
messageInterceptor,
@@ -54,7 +54,7 @@ class FifoConsumeService extends ConsumeService {
final MessageViewImpl messageView = iterator.next();
if (messageView.isCorrupted()) {
// Discard corrupted message.
- LOGGER.error("Message is corrupted for FIFO consumption, prepare
to discard it, mq={}, messageId={}, "
+ log.error("Message is corrupted for FIFO consumption, prepare to
discard it, mq={}, messageId={}, "
+ "clientId={}", pq.getMessageQueue(),
messageView.getMessageId(), clientId);
pq.discardFifoMessage(messageView);
consumeIteratively(pq, iterator);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 4e31ff90..8443c293 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -76,7 +76,7 @@ class ProcessQueueImpl implements ProcessQueue {
static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY =
Duration.ofSeconds(1);
static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY =
Duration.ofSeconds(1);
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProcessQueueImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProcessQueueImpl.class);
private static final Duration RECEIVING_FLOW_CONTROL_BACKOFF_DELAY =
Duration.ofMillis(20);
private static final Duration RECEIVING_FAILURE_BACKOFF_DELAY =
Duration.ofSeconds(1);
@@ -141,7 +141,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (afterCacheFullDuration.compareTo(maxIdleDuration) < 0) {
return false;
}
- LOGGER.warn("Process queue is idle, idleDuration={},
maxIdleDuration={}, afterCacheFullDuration={}, mq={}, "
+ log.warn("Process queue is idle, idleDuration={}, maxIdleDuration={},
afterCacheFullDuration={}, mq={}, "
+ "clientId={}", idleDuration, maxIdleDuration,
afterCacheFullDuration, mq, consumer.getClientId());
return true;
}
@@ -184,14 +184,14 @@ class ProcessQueueImpl implements ProcessQueue {
final ClientId clientId = consumer.getClientId();
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
- LOGGER.info("Try to receive message later, mq={}, delay={},
clientId={}", mq, delay, clientId);
+ log.info("Try to receive message later, mq={}, delay={},
clientId={}", mq, delay, clientId);
scheduler.schedule(this::receiveMessage, delay.toNanos(),
TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule message receiving request,
mq={}, clientId={}", mq, clientId, t);
+ log.error("[Bug] Failed to schedule message receiving request,
mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t);
}
}
@@ -199,11 +199,11 @@ class ProcessQueueImpl implements ProcessQueue {
public void receiveMessage() {
final ClientId clientId = consumer.getClientId();
if (dropped) {
- LOGGER.info("Process queue has been dropped, no longer receive
message, mq={}, clientId={}", mq, clientId);
+ log.info("Process queue has been dropped, no longer receive
message, mq={}, clientId={}", mq, clientId);
return;
}
if (this.isCacheFull()) {
- LOGGER.warn("Process queue cache is full, would receive message
later, mq={}, clientId={}", mq, clientId);
+ log.warn("Process queue cache is full, would receive message
later, mq={}, clientId={}", mq, clientId);
receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
return;
}
@@ -213,7 +213,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void receiveMessageImmediately() {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
- LOGGER.info("Stop to receive message because consumer is not
running, mq={}, clientId={}", mq, clientId);
+ log.info("Stop to receive message because consumer is not running,
mq={}, clientId={}", mq, clientId);
return;
}
try {
@@ -243,7 +243,7 @@ class ProcessQueueImpl implements ProcessQueue {
onReceiveMessageResult(result);
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while handling
receive result, mq={}, endpoints={}, "
+ log.error("[Bug] Exception raised while handling
receive result, mq={}, endpoints={}, "
+ "clientId={}", mq, endpoints, clientId, t);
onReceiveMessageException(t);
}
@@ -256,7 +256,7 @@ class ProcessQueueImpl implements ProcessQueue {
new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());
- LOGGER.error("Exception raised during message reception,
mq={}, endpoints={}, clientId={}", mq,
+ log.error("Exception raised during message reception,
mq={}, endpoints={}, clientId={}", mq,
endpoints, clientId, t);
onReceiveMessageException(t);
}
@@ -264,7 +264,7 @@ class ProcessQueueImpl implements ProcessQueue {
receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
- LOGGER.error("Exception raised during message reception, mq={},
clientId={}", mq, clientId, t);
+ log.error("Exception raised during message reception, mq={},
clientId={}", mq, clientId, t);
onReceiveMessageException(t);
}
}
@@ -274,7 +274,7 @@ class ProcessQueueImpl implements ProcessQueue {
final long actualMessagesQuantity = this.cachedMessagesCount();
final ClientId clientId = consumer.getClientId();
if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
- LOGGER.warn("Process queue total cached messages quantity exceeds
the threshold, threshold={}, actual={}," +
+ log.warn("Process queue total cached messages quantity exceeds the
threshold, threshold={}, actual={}," +
" mq={}, clientId={}", cacheMessageCountThresholdPerQueue,
actualMessagesQuantity, mq, clientId);
cacheFullNanoTime = System.nanoTime();
return true;
@@ -282,7 +282,7 @@ class ProcessQueueImpl implements ProcessQueue {
final int cacheMessageBytesThresholdPerQueue =
consumer.cacheMessageBytesThresholdPerQueue();
final long actualCachedMessagesBytes = this.cachedMessageBytes();
if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
- LOGGER.warn("Process queue total cached messages memory exceeds
the threshold, threshold={} bytes," +
+ log.warn("Process queue total cached messages memory exceeds the
threshold, threshold={} bytes," +
" actual={} bytes, mq={}, clientId={}",
cacheMessageBytesThresholdPerQueue,
actualCachedMessagesBytes, mq, clientId);
cacheFullNanoTime = System.nanoTime();
@@ -293,7 +293,7 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void discardMessage(MessageViewImpl messageView) {
- LOGGER.info("Discard message, mq={}, messageId={}, clientId={}", mq,
messageView.getMessageId(),
+ log.info("Discard message, mq={}, messageId={}, clientId={}", mq,
messageView.getMessageId(),
consumer.getClientId());
final ListenableFuture<Void> future = nackMessage(messageView);
future.addListener(() -> evictCache(messageView),
MoreExecutors.directExecutor());
@@ -301,7 +301,7 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void discardFifoMessage(MessageViewImpl messageView) {
- LOGGER.info("Discard fifo message, mq={}, messageId={}, clientId={}",
mq, messageView.getMessageId(),
+ log.info("Discard fifo message, mq={}, messageId={}, clientId={}", mq,
messageView.getMessageId(),
consumer.getClientId());
final ListenableFuture<Void> future =
forwardToDeadLetterQueue(messageView);
future.addListener(() -> evictCache(messageView),
MoreExecutors.directExecutor());
@@ -381,7 +381,7 @@ class ProcessQueueImpl implements ProcessQueue {
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
- LOGGER.error("Failed to change invisible duration due to
the invalid receipt handle, forgive to "
+ log.error("Failed to change invisible duration due to the
invalid receipt handle, forgive to "
+ "retry, clientId={}, consumerGroup={},
messageId={}, attempt={}, mq={}, endpoints={}, "
+ "requestId={}, status message=[{}]", clientId,
consumerGroup, messageId, attempt, mq,
endpoints, requestId, status.getMessage());
@@ -390,7 +390,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
// Log failure and retry later.
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to change invisible duration, would
retry later, clientId={}, "
+ log.error("Failed to change invisible duration, would
retry later, clientId={}, "
+ "consumerGroup={}, messageId={}, attempt={},
mq={}, endpoints={}, requestId={}, "
+ "status message=[{}]", clientId, consumerGroup,
messageId, attempt, mq, endpoints,
requestId, status.getMessage());
@@ -401,12 +401,12 @@ class ProcessQueueImpl implements ProcessQueue {
future0.setFuture(Futures.immediateVoidFuture());
// Log retries.
if (1 < attempt) {
- LOGGER.info("Finally, change invisible duration
successfully, clientId={}, consumerGroup={} "
+ log.info("Finally, change invisible duration successfully,
clientId={}, consumerGroup={} "
+ "messageId={}, attempt={}, mq={}, endpoints={},
requestId={}", clientId, consumerGroup,
messageId, attempt, mq, endpoints, requestId);
return;
}
- LOGGER.debug("Change invisible duration successfully,
clientId={}, consumerGroup={}, messageId={}, "
+ log.debug("Change invisible duration successfully,
clientId={}, consumerGroup={}, messageId={}, "
+ "mq={}, endpoints={}, requestId={}", clientId,
consumerGroup, messageId, mq, endpoints,
requestId);
}
@@ -414,7 +414,7 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void onFailure(Throwable t) {
// Log failure and retry later.
- LOGGER.error("Exception raised while changing invisible
duration, would retry later, clientId={}, "
+ log.error("Exception raised while changing invisible duration,
would retry later, clientId={}, "
+ "consumerGroup={}, messageId={}, mq={},
endpoints={}", clientId, consumerGroup,
messageId, mq, endpoints, t);
changeInvisibleDurationLater(messageView, duration, 1 +
attempt, future0);
@@ -434,7 +434,7 @@ class ProcessQueueImpl implements ProcessQueue {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule message change invisible
duration request, mq={}, messageId={}, "
+ log.error("[Bug] Failed to schedule message change invisible
duration request, mq={}, messageId={}, "
+ "clientId={}", mq, messageId, consumer.getClientId());
changeInvisibleDurationLater(messageView, duration, 1 + attempt,
future);
}
@@ -452,7 +452,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (ConsumeResult.FAILURE.equals(consumeResult) && attempt <
maxAttempts) {
final Duration nextAttemptDelay =
retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
- LOGGER.debug("Prepare to redeliver the fifo message because of the
consumption failure, maxAttempt={}," +
+ log.debug("Prepare to redeliver the fifo message because of the
consumption failure, maxAttempt={}," +
" attempt={}, mq={}, messageId={}, nextAttemptDelay={},
clientId={}", maxAttempts, attempt, mq,
messageId, nextAttemptDelay, clientId);
final ListenableFuture<ConsumeResult> future =
service.consume(messageView, nextAttemptDelay);
@@ -461,7 +461,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
boolean ok = ConsumeResult.SUCCESS.equals(consumeResult);
if (!ok) {
- LOGGER.info("Failed to consume fifo message finally, run out of
attempt times, maxAttempts={}, "
+ log.info("Failed to consume fifo message finally, run out of
attempt times, maxAttempts={}, "
+ "attempt={}, mq={}, messageId={}, clientId={}", maxAttempts,
attempt, mq, messageId, clientId);
}
// Ack message or forward it to DLQ depends on consumption result.
@@ -493,7 +493,7 @@ class ProcessQueueImpl implements ProcessQueue {
final Code code = status.getCode();
// Log failure and retry later.
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to forward message to dead letter
queue, would attempt to re-forward later," +
+ log.error("Failed to forward message to dead letter queue,
would attempt to re-forward later," +
" clientId={}, consumerGroup={}, messageId={},
attempt={}, mq={}, endpoints={}, "
+ "requestId={}, code={}, status message={}",
clientId, consumerGroup, messageId, attempt,
mq, endpoints, requestId, code, status.getMessage());
@@ -504,12 +504,12 @@ class ProcessQueueImpl implements ProcessQueue {
future0.setFuture(Futures.immediateVoidFuture());
// Log retries.
if (1 < attempt) {
- LOGGER.info("Re-forward message to dead letter queue
successfully, clientId={}, consumerGroup={}, "
+ log.info("Re-forward message to dead letter queue
successfully, clientId={}, consumerGroup={}, "
+ "attempt={}, messageId={}, mq={}, endpoints={},
requestId={}", clientId, consumerGroup,
attempt, messageId, mq, endpoints, requestId);
return;
}
- LOGGER.info("Forward message to dead letter queue
successfully, clientId={}, consumerGroup={}, "
+ log.info("Forward message to dead letter queue successfully,
clientId={}, consumerGroup={}, "
+ "messageId={}, mq={}, endpoints={}, requestId={}",
clientId, consumerGroup, messageId, mq,
endpoints, requestId);
}
@@ -517,7 +517,7 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void onFailure(Throwable t) {
// Log failure and retry later.
- LOGGER.error("Exception raised while forward message to DLQ,
would attempt to re-forward later, " +
+ log.error("Exception raised while forward message to DLQ,
would attempt to re-forward later, " +
"clientId={}, consumerGroup={}, attempt={},
messageId={}, mq={}", clientId, consumerGroup,
attempt, messageId, mq, t);
forwardToDeadLetterQueueLater(messageView, 1 + attempt,
future0);
@@ -536,7 +536,7 @@ class ProcessQueueImpl implements ProcessQueue {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={},
messageId={}, clientId={}", mq,
+ log.error("[Bug] Failed to schedule DLQ message request, mq={},
messageId={}, clientId={}", mq,
messageView.getMessageId(), consumer.getClientId());
forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0);
}
@@ -562,7 +562,7 @@ class ProcessQueueImpl implements ProcessQueue {
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
- LOGGER.error("Failed to ack message due to the invalid
receipt handle, forgive to retry, "
+ log.error("Failed to ack message due to the invalid
receipt handle, forgive to retry, "
+ "clientId={}, consumerGroup={}, messageId={},
attempt={}, mq={}, endpoints={}, "
+ "requestId={}, status message=[{}]", clientId,
consumerGroup, messageId, attempt, mq,
endpoints, requestId, status.getMessage());
@@ -571,7 +571,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
// Log failure and retry later.
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to ack message, would attempt to
re-ack later, clientId={}, "
+ log.error("Failed to ack message, would attempt to re-ack
later, clientId={}, "
+ "consumerGroup={}, attempt={}, messageId={},
mq={}, code={}, requestId={}, endpoints={}, "
+ "status message=[{}]", clientId, consumerGroup,
attempt, messageId, mq, code, requestId,
endpoints, status.getMessage());
@@ -582,19 +582,19 @@ class ProcessQueueImpl implements ProcessQueue {
future0.setFuture(Futures.immediateVoidFuture());
// Log retries.
if (1 < attempt) {
- LOGGER.info("Finally, ack message successfully,
clientId={}, consumerGroup={}, attempt={}, "
+ log.info("Finally, ack message successfully, clientId={},
consumerGroup={}, attempt={}, "
+ "messageId={}, mq={}, endpoints={},
requestId={}", clientId, consumerGroup, attempt,
messageId, mq, endpoints, requestId);
return;
}
- LOGGER.debug("Ack message successfully, clientId={},
consumerGroup={}, messageId={}, mq={}, "
+ log.debug("Ack message successfully, clientId={},
consumerGroup={}, messageId={}, mq={}, "
+ "endpoints={}, requestId={}", clientId, consumerGroup,
messageId, mq, endpoints, requestId);
}
@Override
public void onFailure(Throwable t) {
// Log failure and retry later.
- LOGGER.error("Exception raised while acknowledging message,
clientId={}, consumerGroup={}, "
+ log.error("Exception raised while acknowledging message,
clientId={}, consumerGroup={}, "
+ "would attempt to re-ack later, attempt={},
messageId={}, mq={}, endpoints={}", clientId,
consumerGroup, attempt, messageId, mq, endpoints, t);
ackMessageLater(messageView, 1 + attempt, future0);
@@ -614,7 +614,7 @@ class ProcessQueueImpl implements ProcessQueue {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule message ack request, mq={},
messageId={}, clientId={}",
+ log.error("[Bug] Failed to schedule message ack request, mq={},
messageId={}, clientId={}",
mq, messageId, consumer.getClientId());
ackMessageLater(messageView, 1 + attempt, future);
}
@@ -638,7 +638,7 @@ class ProcessQueueImpl implements ProcessQueue {
public void doStats() {
final long receptionTimes = this.receptionTimes.getAndSet(0);
final long receivedMessagesQuantity =
this.receivedMessagesQuantity.getAndSet(0);
- LOGGER.info("Process queue stats: clientId={}, mq={},
receptionTimes={}, receivedMessageQuantity={}, "
+ log.info("Process queue stats: clientId={}, mq={}, receptionTimes={},
receivedMessageQuantity={}, "
+ "cachedMessageCount={}, cachedMessageBytes={}",
consumer.getClientId(), mq, receptionTimes,
receivedMessagesQuantity, this.getCachedMessageCount(),
this.getCachedMessageBytes());
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 81942276..6a07f6e0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PushConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(PushConsumerImpl.class);
final AtomicLong consumptionOkQuantity;
final AtomicLong consumptionErrorQuantity;
@@ -154,7 +154,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
@Override
protected void startUp() throws Exception {
try {
- LOGGER.info("Begin to start the rocketmq push consumer,
clientId={}", clientId);
+ log.info("Begin to start the rocketmq push consumer, clientId={}",
clientId);
GaugeObserver gaugeObserver = new
ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
this.clientMeterManager.setGaugeObserver(gaugeObserver);
super.startUp();
@@ -165,12 +165,12 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
try {
scanAssignments();
} catch (Throwable t) {
- LOGGER.error("Exception raised while scanning the load
assignments, clientId={}", clientId, t);
+ log.error("Exception raised while scanning the load
assignments, clientId={}", clientId, t);
}
}, 1, 5, TimeUnit.SECONDS);
- LOGGER.info("The rocketmq push consumer starts successfully,
clientId={}", clientId);
+ log.info("The rocketmq push consumer starts successfully,
clientId={}", clientId);
} catch (Throwable t) {
- LOGGER.error("Exception raised while starting the rocketmq push
consumer, clientId={}", clientId, t);
+ log.error("Exception raised while starting the rocketmq push
consumer, clientId={}", clientId, t);
shutDown();
throw t;
}
@@ -178,23 +178,23 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
@Override
protected void shutDown() throws InterruptedException {
- LOGGER.info("Begin to shutdown the rocketmq push consumer,
clientId={}", clientId);
+ log.info("Begin to shutdown the rocketmq push consumer, clientId={}",
clientId);
if (null != scanAssignmentsFuture) {
scanAssignmentsFuture.cancel(false);
}
super.shutDown();
this.consumptionExecutor.shutdown();
ExecutorServices.awaitTerminated(consumptionExecutor);
- LOGGER.info("Shutdown the rocketmq push consumer successfully,
clientId={}", clientId);
+ log.info("Shutdown the rocketmq push consumer successfully,
clientId={}", clientId);
}
private ConsumeService createConsumeService() {
final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
if (pushSubscriptionSettings.isFifo()) {
- LOGGER.info("Create FIFO consume service, consumerGroup={},
clientId={}", consumerGroup, clientId);
+ log.info("Create FIFO consume service, consumerGroup={},
clientId={}", consumerGroup, clientId);
return new FifoConsumeService(clientId, messageListener,
consumptionExecutor, this, scheduler);
}
- LOGGER.info("Create standard consume service, consumerGroup={},
clientId={}", consumerGroup, clientId);
+ log.info("Create standard consume service, consumerGroup={},
clientId={}", consumerGroup, clientId);
return new StandardConsumeService(clientId, messageListener,
consumptionExecutor, this, scheduler);
}
@@ -225,7 +225,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
public PushConsumer subscribe(String topic, FilterExpression
filterExpression) throws ClientException {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to add subscription because push consumer is
not running, state={}, clientId={}",
+ log.error("Unable to add subscription because push consumer is not
running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Push consumer is not running
now");
}
@@ -242,7 +242,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
public PushConsumer unsubscribe(String topic) {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to remove subscription because push consumer
is not running, state={}, clientId={}",
+ log.error("Unable to remove subscription because push consumer is
not running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Push consumer is not running
now");
}
@@ -340,14 +340,14 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
}
if (!latest.contains(mq)) {
- LOGGER.info("Drop message queue according to the latest
assignmentList, mq={}, clientId={}", mq,
+ log.info("Drop message queue according to the latest
assignmentList, mq={}, clientId={}", mq,
clientId);
dropProcessQueue(mq);
continue;
}
if (pq.expired()) {
- LOGGER.warn("Drop message queue because it is expired, mq={},
clientId={}", mq, clientId);
+ log.warn("Drop message queue because it is expired, mq={},
clientId={}", mq, clientId);
dropProcessQueue(mq);
continue;
}
@@ -360,7 +360,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
}
final Optional<ProcessQueue> optionalProcessQueue =
createProcessQueue(mq, filterExpression);
if (optionalProcessQueue.isPresent()) {
- LOGGER.info("Start to fetch message from remote, mq={},
clientId={}", mq, clientId);
+ log.info("Start to fetch message from remote, mq={},
clientId={}", mq, clientId);
optionalProcessQueue.get().fetchMessageImmediately();
}
}
@@ -369,7 +369,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
@VisibleForTesting
void scanAssignments() {
try {
- LOGGER.debug("Start to scan assignments periodically,
clientId={}", clientId);
+ log.debug("Start to scan assignments periodically, clientId={}",
clientId);
for (Map.Entry<String, FilterExpression> entry :
subscriptionExpressions.entrySet()) {
final String topic = entry.getKey();
final FilterExpression filterExpression = entry.getValue();
@@ -380,22 +380,22 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
public void onSuccess(Assignments latest) {
if (latest.getAssignmentList().isEmpty()) {
if (null == existed ||
existed.getAssignmentList().isEmpty()) {
- LOGGER.info("Acquired empty assignments from
remote, would scan later, topic={}, "
+ log.info("Acquired empty assignments from
remote, would scan later, topic={}, "
+ "clientId={}", topic, clientId);
return;
}
- LOGGER.info("Attention!!! acquired empty
assignments from remote, but existed assignments"
+ log.info("Attention!!! acquired empty assignments
from remote, but existed assignments"
+ " is not empty, topic={}, clientId={}",
topic, clientId);
}
if (!latest.equals(existed)) {
- LOGGER.info("Assignments of topic={} has changed,
{} => {}, clientId={}", topic, existed,
+ log.info("Assignments of topic={} has changed, {}
=> {}, clientId={}", topic, existed,
latest, clientId);
syncProcessQueue(topic, latest, filterExpression);
cacheAssignments.put(topic, latest);
return;
}
- LOGGER.debug("Assignments of topic={} remains the
same, assignments={}, clientId={}", topic,
+ log.debug("Assignments of topic={} remains the same,
assignments={}, clientId={}", topic,
existed, clientId);
// Process queue may be dropped, need to be
synchronized anyway.
syncProcessQueue(topic, latest, filterExpression);
@@ -403,13 +403,13 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Exception raised while scanning the
assignments, topic={}, clientId={}", topic,
+ log.error("Exception raised while scanning the
assignments, topic={}, clientId={}", topic,
clientId, t);
}
}, MoreExecutors.directExecutor());
}
} catch (Throwable t) {
- LOGGER.error("Exception raised while scanning the assignments for
all topics, clientId={}", clientId, t);
+ log.error("Exception raised while scanning the assignments for all
topics, clientId={}", clientId, t);
}
}
@@ -480,7 +480,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
try {
telemetry(endpoints, command);
} catch (Throwable t) {
- LOGGER.error("Failed to send message verification result
command, endpoints={}, command={}, "
+ log.error("Failed to send message verification result
command, endpoints={}, command={}, "
+ "messageId={}, clientId={}", endpoints, command,
messageId, clientId, t);
}
}
@@ -488,7 +488,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
@Override
public void onFailure(Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Failed to get message verification result,
endpoints={}, messageId={}, "
+ log.error("[Bug] Failed to get message verification result,
endpoints={}, messageId={}, "
+ "clientId={}", endpoints, messageId, clientId, t);
}
}, MoreExecutors.directExecutor());
@@ -547,7 +547,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
final long consumptionOkQuantity =
this.consumptionOkQuantity.getAndSet(0);
final long consumptionErrorQuantity =
this.consumptionErrorQuantity.getAndSet(0);
- LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={},
receivedMessagesQuantity={}, "
+ log.info("clientId={}, consumerGroup={}, receptionTimes={},
receivedMessagesQuantity={}, "
+ "consumptionOkQuantity={}, consumptionErrorQuantity={}",
clientId, consumerGroup, receptionTimes,
receivedMessagesQuantity, consumptionOkQuantity,
consumptionErrorQuantity);
processQueueTable.values().forEach(ProcessQueue::doStats);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 04422ac2..adb45fc0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushSubscriptionSettings extends Settings {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PushSubscriptionSettings.class);
+ private static final Logger log =
LoggerFactory.getLogger(PushSubscriptionSettings.class);
private final Resource group;
private final Map<String, FilterExpression> subscriptionExpressions;
@@ -86,7 +86,7 @@ public class PushSubscriptionSettings extends Settings {
expressionBuilder.setType(FilterType.SQL);
break;
default:
- LOGGER.warn("[Bug] Unrecognized filter type, type={}",
type);
+ log.warn("[Bug] Unrecognized filter type, type={}", type);
}
SubscriptionEntry subscriptionEntry =
SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
@@ -103,7 +103,7 @@ public class PushSubscriptionSettings extends Settings {
public void sync(apache.rocketmq.v2.Settings settings) {
final apache.rocketmq.v2.Settings.PubSubCase pubSubCase =
settings.getPubSubCase();
if
(!apache.rocketmq.v2.Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client
type, clientId={}, pubSubCase={}, "
+ log.error("[Bug] Issued settings not match with the client type,
clientId={}, pubSubCase={}, "
+ "clientType={}", clientId, pubSubCase, clientType);
return;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 310df59b..a21267d0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings("UnstableApiUsage")
class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(SimpleConsumerImpl.class);
private final SimpleSubscriptionSettings simpleSubscriptionSettings;
private final String consumerGroup;
@@ -87,11 +87,11 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
@Override
protected void startUp() throws Exception {
try {
- LOGGER.info("Begin to start the rocketmq simple consumer,
clientId={}", clientId);
+ log.info("Begin to start the rocketmq simple consumer,
clientId={}", clientId);
super.startUp();
- LOGGER.info("The rocketmq simple consumer starts successfully,
clientId={}", clientId);
+ log.info("The rocketmq simple consumer starts successfully,
clientId={}", clientId);
} catch (Throwable t) {
- LOGGER.error("Failed to start the rocketmq simple consumer, try to
shutdown it, clientId={}", clientId, t);
+ log.error("Failed to start the rocketmq simple consumer, try to
shutdown it, clientId={}", clientId, t);
shutDown();
throw t;
}
@@ -99,9 +99,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
@Override
protected void shutDown() throws InterruptedException {
- LOGGER.info("Begin to shutdown the rocketmq simple consumer,
clientId={}", clientId);
+ log.info("Begin to shutdown the rocketmq simple consumer,
clientId={}", clientId);
super.shutDown();
- LOGGER.info("Shutdown the rocketmq simple consumer successfully,
clientId={}", clientId);
+ log.info("Shutdown the rocketmq simple consumer successfully,
clientId={}", clientId);
}
/**
@@ -119,7 +119,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
public SimpleConsumer subscribe(String topic, FilterExpression
filterExpression) throws ClientException {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to add subscription because simple consumer
is not running, state={}, clientId={}",
+ log.error("Unable to add subscription because simple consumer is
not running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running
now");
}
@@ -136,7 +136,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
public SimpleConsumer unsubscribe(String topic) {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to remove subscription because simple
consumer is not running, state={}, "
+ log.error("Unable to remove subscription because simple consumer
is not running, state={}, "
+ "clientId={}", this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running
now");
}
@@ -172,7 +172,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
public ListenableFuture<List<MessageView>> receive0(int maxMessageNum,
Duration invisibleDuration) {
if (!this.isRunning()) {
- LOGGER.error("Unable to receive message because simple consumer is
not running, state={}, clientId={}",
+ log.error("Unable to receive message because simple consumer is
not running, state={}, clientId={}",
this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple
consumer is not running now");
return Futures.immediateFailedFuture(e);
@@ -222,7 +222,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
private ListenableFuture<Void> ack0(MessageView messageView) {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to ack message because simple consumer is not
running, state={}, clientId={}",
+ log.error("Unable to ack message because simple consumer is not
running, state={}, clientId={}",
this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple
consumer is not running now");
return Futures.immediateFailedFuture(e);
@@ -262,7 +262,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
public ListenableFuture<Void> changeInvisibleDuration0(MessageView
messageView, Duration invisibleDuration) {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to change invisible duration because simple
consumer is not running, state={}, "
+ log.error("Unable to change invisible duration because simple
consumer is not running, state={}, "
+ "clientId={}", this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple
consumer is not running now");
return Futures.immediateFailedFuture(e);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 8d95c0f2..cadf6e89 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleSubscriptionSettings extends Settings {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleSubscriptionSettings.class);
+ private static final Logger log =
LoggerFactory.getLogger(SimpleSubscriptionSettings.class);
private final Resource group;
private final Duration longPollingTimeout;
@@ -70,7 +70,7 @@ public class SimpleSubscriptionSettings extends Settings {
expressionBuilder.setType(FilterType.SQL);
break;
default:
- LOGGER.warn("[Bug] Unrecognized filter type for simple
consumer, type={}", type);
+ log.warn("[Bug] Unrecognized filter type for simple
consumer, type={}", type);
}
SubscriptionEntry subscriptionEntry =
SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
@@ -88,7 +88,7 @@ public class SimpleSubscriptionSettings extends Settings {
public void sync(apache.rocketmq.v2.Settings settings) {
final apache.rocketmq.v2.Settings.PubSubCase pubSubCase =
settings.getPubSubCase();
if
(!apache.rocketmq.v2.Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client
type, clientId={}, pubSubCase={}, "
+ log.error("[Bug] Issued settings not match with the client type,
clientId={}, pubSubCase={}, "
+ "clientType={}", clientId, pubSubCase, clientType);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index eef6c7b9..2775077a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("NullableProblems")
public class StandardConsumeService extends ConsumeService {
- private static final Logger LOGGER =
LoggerFactory.getLogger(StandardConsumeService.class);
+ private static final Logger log =
LoggerFactory.getLogger(StandardConsumeService.class);
public StandardConsumeService(ClientId clientId, MessageListener
messageListener,
ThreadPoolExecutor consumptionExecutor, MessageInterceptor
messageInterceptor,
@@ -47,7 +47,7 @@ public class StandardConsumeService extends ConsumeService {
for (MessageViewImpl messageView : messageViews) {
// Discard corrupted message.
if (messageView.isCorrupted()) {
- LOGGER.error("Message is corrupted for standard consumption,
prepare to discard it, mq={}, "
+ log.error("Message is corrupted for standard consumption,
prepare to discard it, mq={}, "
+ "messageId={}, clientId={}", pq.getMessageQueue(),
messageView.getMessageId(), clientId);
pq.discardMessage(messageView);
continue;
@@ -62,7 +62,7 @@ public class StandardConsumeService extends ConsumeService {
@Override
public void onFailure(Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised in consumption
callback, clientId={}", clientId, t);
+ log.error("[Bug] Exception raised in consumption callback,
clientId={}", clientId, t);
}
}, MoreExecutors.directExecutor());
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 70cb6aaa..820a09ae 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -86,7 +86,7 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
class ProducerImpl extends ClientImpl implements Producer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProducerImpl.class);
protected final PublishingSettings publishingSettings;
final ConcurrentMap<String/* topic */, PublishingLoadBalancer>
publishingRouteDataCache;
@@ -109,11 +109,11 @@ class ProducerImpl extends ClientImpl implements Producer
{
@Override
protected void startUp() throws Exception {
try {
- LOGGER.info("Begin to start the rocketmq producer, clientId={}",
clientId);
+ log.info("Begin to start the rocketmq producer, clientId={}",
clientId);
super.startUp();
- LOGGER.info("The rocketmq producer starts successfully,
clientId={}", clientId);
+ log.info("The rocketmq producer starts successfully, clientId={}",
clientId);
} catch (Throwable t) {
- LOGGER.error("Failed to start the rocketmq producer, try to
shutdown it, clientId={}", clientId, t);
+ log.error("Failed to start the rocketmq producer, try to shutdown
it, clientId={}", clientId, t);
shutDown();
throw t;
}
@@ -121,9 +121,9 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
protected void shutDown() throws InterruptedException {
- LOGGER.info("Begin to shutdown the rocketmq producer, clientId={}",
clientId);
+ log.info("Begin to shutdown the rocketmq producer, clientId={}",
clientId);
super.shutDown();
- LOGGER.info("Shutdown the rocketmq producer successfully,
clientId={}", clientId);
+ log.info("Shutdown the rocketmq producer successfully, clientId={}",
clientId);
}
@Override
@@ -131,7 +131,7 @@ class ProducerImpl extends ClientImpl implements Producer {
final String transactionId = command.getTransactionId();
final String messageId =
command.getMessage().getSystemProperties().getMessageId();
if (null == checker) {
- LOGGER.error("No transaction checker registered, ignore it,
messageId={}, transactionId={}, endpoints={},"
+ log.error("No transaction checker registered, ignore it,
messageId={}, transactionId={}, endpoints={},"
+ " clientId={}", messageId, transactionId, endpoints,
clientId);
return;
}
@@ -139,7 +139,7 @@ class ProducerImpl extends ClientImpl implements Producer {
try {
messageView = MessageViewImpl.fromProtobuf(command.getMessage());
} catch (Throwable t) {
- LOGGER.error("[Bug] Failed to decode message during orphaned
transaction message recovery, messageId={}, "
+ log.error("[Bug] Failed to decode message during orphaned
transaction message recovery, messageId={}, "
+ "transactionId={}, endpoints={}, clientId={}", messageId,
transactionId, endpoints, clientId, t);
return;
}
@@ -164,14 +164,14 @@ class ProducerImpl extends ClientImpl implements Producer
{
endTransaction(endpoints, generalMessage,
messageView.getMessageId(),
transactionId, resolution);
} catch (Throwable t) {
- LOGGER.error("Exception raised while ending the
transaction, messageId={}, transactionId={}, "
+ log.error("Exception raised while ending the transaction,
messageId={}, transactionId={}, "
+ "endpoints={}, clientId={}", messageId,
transactionId, endpoints, clientId, t);
}
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Exception raised while checking the transaction,
messageId={}, transactionId={}, "
+ log.error("Exception raised while checking the transaction,
messageId={}, transactionId={}, "
+ "endpoints={}, clientId={}", messageId, transactionId,
endpoints, clientId, t);
}
@@ -242,7 +242,7 @@ class ProducerImpl extends ClientImpl implements Producer {
public Transaction beginTransaction() {
checkNotNull(checker, "Transaction checker should not be null");
if (!this.isRunning()) {
- LOGGER.error("Unable to begin a transaction because producer is
not running, state={}, clientId={}",
+ log.error("Unable to begin a transaction because producer is not
running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Producer is not running now");
}
@@ -329,7 +329,7 @@ class ProducerImpl extends ClientImpl implements Producer {
if (!this.isRunning()) {
final IllegalStateException e = new
IllegalStateException("Producer is not running now");
future.setException(e);
- LOGGER.error("Unable to send message because producer is not
running, state={}, clientId={}",
+ log.error("Unable to send message because producer is not running,
state={}, clientId={}",
this.state(), clientId);
return future;
}
@@ -342,7 +342,7 @@ class ProducerImpl extends ClientImpl implements Producer {
pubMessages.add(pubMessage);
} catch (Throwable t) {
// Failed to refine message, no need to proceed.
- LOGGER.error("Failed to refine message to send, clientId={},
message={}", clientId, message, t);
+ log.error("Failed to refine message to send, clientId={},
message={}", clientId, message, t);
future.setException(t);
return future;
}
@@ -354,7 +354,7 @@ class ProducerImpl extends ClientImpl implements Producer {
// Messages have different topics, no need to proceed.
final IllegalArgumentException e = new
IllegalArgumentException("Messages to send have different topics");
future.setException(e);
- LOGGER.error("Messages to be sent have different topics, no need
to proceed, topic(s)={}, clientId={}",
+ log.error("Messages to be sent have different topics, no need to
proceed, topic(s)={}, clientId={}",
topics, clientId);
return future;
}
@@ -369,7 +369,7 @@ class ProducerImpl extends ClientImpl implements Producer {
final IllegalArgumentException e = new
IllegalArgumentException("Messages to send have different types, "
+ "please check");
future.setException(e);
- LOGGER.error("Messages to be sent have different message types, no
need to proceed, topic={}, messageType"
+ log.error("Messages to be sent have different message types, no
need to proceed, topic={}, messageType"
+ "(s)={}, clientId={}", topic, messageTypes, clientId, e);
return future;
}
@@ -387,7 +387,7 @@ class ProducerImpl extends ClientImpl implements Producer {
final IllegalArgumentException e = new
IllegalArgumentException("FIFO messages to send have different "
+ "message groups, messageGroups=" + messageGroups);
future.setException(e);
- LOGGER.error("FIFO messages to be sent have different message
groups, no need to proceed, topic={}, "
+ log.error("FIFO messages to be sent have different message
groups, no need to proceed, topic={}, "
+ "messageGroups={}, clientId={}", topic, messageGroups,
clientId, e);
return future;
}
@@ -484,7 +484,7 @@ class ProducerImpl extends ClientImpl implements Producer {
for (SendReceipt receipt : sendReceipts) {
messageIds.add(receipt.getMessageId());
}
- LOGGER.info("Resend message successfully, topic={},
messageId(s)={}, maxAttempts={}, "
+ log.info("Resend message successfully, topic={},
messageId(s)={}, maxAttempts={}, "
+ "attempt={}, endpoints={}, clientId={}", topic,
messageIds, maxAttempts, attempt,
endpoints, clientId);
}
@@ -508,7 +508,7 @@ class ProducerImpl extends ClientImpl implements Producer {
if (attempt >= maxAttempts) {
// No need more attempts.
future0.setException(t);
- LOGGER.error("Failed to send message(s) finally, run out
of attempt times, maxAttempts={}, " +
+ log.error("Failed to send message(s) finally, run out of
attempt times, maxAttempts={}, " +
"attempt={}, topic={}, messageId(s)={},
endpoints={}, clientId={}",
maxAttempts, attempt, topic, messageIds, endpoints,
clientId, t);
return;
@@ -516,7 +516,7 @@ class ProducerImpl extends ClientImpl implements Producer {
// No need more attempts for transactional message.
if (MessageType.TRANSACTION.equals(messageType)) {
future0.setException(t);
- LOGGER.error("Failed to send transactional message
finally, maxAttempts=1, attempt={}, " +
+ log.error("Failed to send transactional message finally,
maxAttempts=1, attempt={}, " +
"topic={}, messageId(s)={}, endpoints={},
clientId={}", attempt, topic, messageIds,
endpoints, clientId, t);
return;
@@ -525,14 +525,14 @@ class ProducerImpl extends ClientImpl implements Producer
{
int nextAttempt = 1 + attempt;
// Retry immediately if the request is not throttled.
if (!(t instanceof TooManyRequestsException)) {
- LOGGER.warn("Failed to send message, would attempt to
resend right now, maxAttempts={}, "
+ log.warn("Failed to send message, would attempt to resend
right now, maxAttempts={}, "
+ "attempt={}, topic={}, messageId(s)={},
endpoints={}, clientId={}", maxAttempts, attempt,
topic, messageIds, endpoints, clientId, t);
send0(future0, topic, messageType, candidates, messages,
nextAttempt);
return;
}
final Duration delay =
ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
- LOGGER.warn("Failed to send message due to too many requests,
would attempt to resend after {}, "
+ log.warn("Failed to send message due to too many requests,
would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={},
messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints,
clientId, t);
ProducerImpl.this.getClientManager().getScheduler().schedule(() ->
send0(future0, topic, messageType,
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index a3fc54d6..a4c5e629 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PublishingSettings extends Settings {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PublishingSettings.class);
+ private static final Logger log =
LoggerFactory.getLogger(PublishingSettings.class);
private final Set<String> topics;
/**
@@ -73,7 +73,7 @@ public class PublishingSettings extends Settings {
public void sync(apache.rocketmq.v2.Settings settings) {
final apache.rocketmq.v2.Settings.PubSubCase pubSubCase =
settings.getPubSubCase();
if
(!apache.rocketmq.v2.Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client
type, clientId={}, pubSubCase={}, "
+ log.error("[Bug] Issued settings not match with the client type,
clientId={}, pubSubCase={}, "
+ "clientType={}", clientId, pubSubCase, clientType);
return;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index 027ca5b2..33ad88bb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -47,7 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageViewImpl implements LinkedElement<MessageViewImpl>,
MessageView {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageViewImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(MessageViewImpl.class);
final byte[] body;
private final MessageId messageId;
@@ -269,7 +269,7 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
}
} catch (NoSuchAlgorithmException e) {
corrupted = true;
- LOGGER.error("MD5 is not supported unexpectedly, skip it,
topic={}, messageId={}", topic,
+ log.error("MD5 is not supported unexpectedly, skip it,
topic={}, messageId={}", topic,
messageId);
}
break;
@@ -281,12 +281,12 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
}
} catch (NoSuchAlgorithmException e) {
corrupted = true;
- LOGGER.error("SHA-1 is not supported unexpectedly, skip
it, topic={}, messageId={}", topic,
+ log.error("SHA-1 is not supported unexpectedly, skip it,
topic={}, messageId={}", topic,
messageId);
}
break;
default:
- LOGGER.error("Unsupported message body digest algorithm,
digestType={}, topic={}, messageId={}",
+ log.error("Unsupported message body digest algorithm,
digestType={}, topic={}, messageId={}",
digestType, topic, messageId);
}
final Encoding bodyEncoding = systemProperties.getBodyEncoding();
@@ -295,14 +295,14 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
try {
body = Utilities.uncompressBytesGzip(body);
} catch (IOException e) {
- LOGGER.error("Failed to uncompress message body, topic={},
messageId={}", topic, messageId);
+ log.error("Failed to uncompress message body, topic={},
messageId={}", topic, messageId);
corrupted = true;
}
break;
case IDENTITY:
break;
default:
- LOGGER.error("Unsupported message encoding algorithm,
topic={}, messageId={}, bodyEncoding={}", topic,
+ log.error("Unsupported message encoding algorithm, topic={},
messageId={}, bodyEncoding={}", topic,
messageId, bodyEncoding);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index 3326793c..e544681b 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientMeter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeter.class);
+ private static final Logger log =
LoggerFactory.getLogger(ClientMeter.class);
private final boolean enabled;
private final Meter meter;
@@ -81,14 +81,14 @@ public class ClientMeter {
if (!enabled) {
return;
}
- LOGGER.info("Begin to shutdown client meter, clientId={},
endpoints={}", clientId, endpoints);
+ log.info("Begin to shutdown client meter, clientId={}, endpoints={}",
clientId, endpoints);
final CountDownLatch latch = new CountDownLatch(1);
provider.shutdown().whenComplete(latch::countDown);
try {
latch.await();
- LOGGER.info("Shutdown client meter successfully, clientId={},
endpoints={}", clientId, endpoints);
+ log.info("Shutdown client meter successfully, clientId={},
endpoints={}", clientId, endpoints);
} catch (Throwable t) {
- LOGGER.error("Failed to shutdown message meter, clientId={},
endpoints={}", clientId, endpoints, t);
+ log.error("Failed to shutdown message meter, clientId={},
endpoints={}", clientId, endpoints, t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
index c882a5c2..1dd0dfa5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
@@ -47,7 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientMeterManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeterManager.class);
+ private static final Logger log =
LoggerFactory.getLogger(ClientMeterManager.class);
private static final Duration METRIC_EXPORTER_RPC_TIMEOUT =
Duration.ofSeconds(3);
private static final Duration METRIC_READER_INTERVAL =
Duration.ofMinutes(1);
@@ -80,12 +80,12 @@ public class ClientMeterManager {
public synchronized void reset(Metric metric) {
try {
if (clientMeter.satisfy(metric)) {
- LOGGER.info("Metric settings is satisfied by the current
message meter, metric={}, clientId={}",
+ log.info("Metric settings is satisfied by the current message
meter, metric={}, clientId={}",
metric, clientId);
return;
}
if (!metric.isOn()) {
- LOGGER.info("Metric is off, clientId={}", clientId);
+ log.info("Metric is off, clientId={}", clientId);
clientMeter.shutdown();
clientMeter = ClientMeter.disabledInstance(clientId);
return;
@@ -142,7 +142,7 @@ public class ClientMeterManager {
ClientMeter existedClientMeter = clientMeter;
clientMeter = new ClientMeter(meter, endpoints, provider,
clientId);
existedClientMeter.shutdown();
- LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints,
clientId);
+ log.info("Metrics is on, endpoints={}, clientId={}", endpoints,
clientId);
final List<GaugeEnum> gauges = gaugeObserver.getGauges();
for (GaugeEnum gauge : gauges) {
@@ -159,7 +159,7 @@ public class ClientMeterManager {
});
}
} catch (Throwable t) {
- LOGGER.error("Exception raised when resetting message meter,
clientId={}", clientId, t);
+ log.error("Exception raised when resetting message meter,
clientId={}", clientId, t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index b68dd55d..5f861b21 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -39,7 +39,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
static final AttributeKey<Stopwatch> SEND_STOPWATCH_KEY =
AttributeKey.create("send_stopwatch");
static final AttributeKey<Stopwatch> CONSUME_STOPWATCH_KEY =
AttributeKey.create("consume_stopwatch");
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterInterceptor.class);
+ private static final Logger log =
LoggerFactory.getLogger(MessageMeterInterceptor.class);
private final Client client;
private final ClientMeterManager meterManager;
@@ -83,7 +83,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
consumerGroup = ((SimpleConsumer) client).getConsumerGroup();
}
if (null == consumerGroup) {
- LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}",
client.getClientId());
+ log.error("[Bug] consumerGroup is not recognized, clientId={}",
client.getClientId());
return;
}
final GeneralMessage message = messages.iterator().next();
@@ -95,7 +95,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
final long currentTimeMillis = System.currentTimeMillis();
final long latency = currentTimeMillis - transportDeliveryTimestamp;
if (0 > latency) {
- LOGGER.debug("latency is negative, latency={}ms,
currentTimeMillis={}, transportDeliveryTimestamp={}",
+ log.debug("latency is negative, latency={}ms,
currentTimeMillis={}, transportDeliveryTimestamp={}",
latency, currentTimeMillis, transportDeliveryTimestamp);
return;
}
@@ -115,7 +115,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
}
if (null == consumerGroup) {
- LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}",
client.getClientId());
+ log.error("[Bug] consumerGroup is not recognized, clientId={}",
client.getClientId());
return;
}
final GeneralMessage message = messages.iterator().next();
@@ -136,7 +136,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
private void doAfterConsumeMessage(MessageInterceptorContext context,
List<GeneralMessage> messages) {
if (!(client instanceof PushConsumer)) {
// Should never reach here.
- LOGGER.error("[Bug] current client is not push consumer,
clientId={}", client.getClientId());
+ log.error("[Bug] current client is not push consumer,
clientId={}", client.getClientId());
return;
}
final Attribute<Stopwatch> stopwatchAttr =
context.getAttribute(CONSUME_STOPWATCH_KEY);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
index 31e5f4a6..14da7ddb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AuthInterceptor implements ClientInterceptor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(AuthInterceptor.class);
+ private static final Logger log =
LoggerFactory.getLogger(AuthInterceptor.class);
private final ClientConfiguration clientConfiguration;
private final ClientId clientId;
@@ -45,7 +45,7 @@ public class AuthInterceptor implements ClientInterceptor {
final Metadata metadata = Signature.sign(clientConfiguration,
clientId);
headers.merge(metadata);
} catch (Throwable t) {
- LOGGER.error("Failed to sign headers, clientId={}", clientId, t);
+ log.error("Failed to sign headers, clientId={}", clientId, t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
index bff53d74..45261401 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
* The client log interceptor based on grpc can track any remote procedure
call that interacts with the client locally.
*/
public class LoggingInterceptor implements ClientInterceptor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(LoggingInterceptor.class);
+ private static final Logger log =
LoggerFactory.getLogger(LoggingInterceptor.class);
private static final LoggingInterceptor INSTANCE = new
LoggingInterceptor();
@@ -53,20 +53,20 @@ public class LoggingInterceptor implements
ClientInterceptor {
return new ForwardingClientCall.SimpleForwardingClientCall<T,
E>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<E> responseListener, final Metadata
headers) {
- LOGGER.trace("gRPC request header, rpcId={}, serviceName={},
methodName={}, authority={}, headers={}",
+ log.trace("gRPC request header, rpcId={}, serviceName={},
methodName={}, authority={}, headers={}",
rpcId, serviceName, methodName, authority, headers);
Listener<E> observabilityListener =
new
ForwardingClientCallListener.SimpleForwardingClientCallListener<E>(responseListener)
{
@Override
public void onMessage(E response) {
- LOGGER.trace("gRPC response, rpcId={},
serviceName={}, methodName={}, content:\n{}",
+ log.trace("gRPC response, rpcId={},
serviceName={}, methodName={}, content:\n{}",
rpcId, serviceName, methodName, response);
super.onMessage(response);
}
@Override
public void onHeaders(Metadata headers) {
- LOGGER.trace("gRPC response header, rpcId={},
serviceName={}, methodName={}, "
+ log.trace("gRPC response header, rpcId={},
serviceName={}, methodName={}, "
+ "authority={}, headers={}", rpcId,
serviceName, methodName, authority, headers);
super.onHeaders(headers);
}
@@ -76,7 +76,7 @@ public class LoggingInterceptor implements ClientInterceptor {
@Override
public void sendMessage(T request) {
- LOGGER.trace("gRPC request, rpcId={}, serviceName={},
methodName={}, content:\n{}", rpcId,
+ log.trace("gRPC request, rpcId={}, serviceName={},
methodName={}, content:\n{}", rpcId,
serviceName, methodName, request);
super.sendMessage(request);
}