This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8150e13a29 [ISSUE #8145] Optimize some code style in client module
(#8146)
8150e13a29 is described below
commit 8150e13a29f9d5ea7bf8814960389837650164af
Author: Willhow <[email protected]>
AuthorDate: Thu May 16 09:50:25 2024 +0800
[ISSUE #8145] Optimize some code style in client module (#8146)
---
.../consumer/ConsumeMessageOrderlyService.java | 8 ++++---
.../client/impl/consumer/RebalancePushImpl.java | 4 ----
.../client/impl/factory/MQClientInstance.java | 6 ++---
.../impl/producer/DefaultMQProducerImpl.java | 28 +++++++++++-----------
.../client/latency/LatencyFaultToleranceImpl.java | 8 +++++++
.../client/trace/AsyncTraceDispatcher.java | 7 +++---
6 files changed, 34 insertions(+), 27 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 36d686048c..3ca465da70 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -85,6 +85,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
}
+ @Override
public void start() {
if
(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()))
{
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -96,10 +97,11 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
log.error("scheduleAtFixedRate lockMQPeriodically
exception", e);
}
}
- }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL,
TimeUnit.MILLISECONDS);
+ }, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL,
TimeUnit.MILLISECONDS);
}
}
+ @Override
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
@@ -201,8 +203,8 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
- final boolean dispathToConsume) {
- if (dispathToConsume) {
+ final boolean dispatchToConsume) {
+ if (dispatchToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue,
messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 59e087c0e0..fe2f19b2f9 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -140,10 +140,6 @@ public class RebalancePushImpl extends RebalanceImpl {
return
defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() ||
defaultMQPushConsumerImpl.isConsumeOrderly() ||
MessageModel.BROADCASTING.equals(messageModel);
}
- public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq,
final PopProcessQueue pq) {
- return true;
- }
-
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index f964869ac2..1ff35a00d1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -125,8 +125,8 @@ public class MQClientInstance {
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable
= new ConcurrentHashMap<>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/*
address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
- private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet
= new HashSet();
- private final ConcurrentMap<String, Integer>
brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap();
+ private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet
= new HashSet<>();
+ private final ConcurrentMap<String, Integer>
brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
"MQClientFactoryScheduledThread"));
private final ScheduledExecutorService fetchRemoteConfigExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
@@ -1161,7 +1161,7 @@ public class MQClientInstance {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = entry.getKey() != MixAll.MASTER_ID;
- found = true;
+ found = brokerAddr != null;
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 5b7bd2dc9d..6268bcc0a1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -570,8 +570,8 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
}
class BackpressureSendCallBack implements SendCallback {
- public boolean isSemaphoreAsyncSizeAquired = false;
- public boolean isSemaphoreAsyncNumAquired = false;
+ public boolean isSemaphoreAsyncSizeAcquired = false;
+ public boolean isSemaphoreAsyncNumbAcquired = false;
public int msgLen;
private final SendCallback sendCallback;
@@ -581,10 +581,10 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
- if (isSemaphoreAsyncSizeAquired) {
+ if (isSemaphoreAsyncSizeAcquired) {
semaphoreAsyncSendSize.release(msgLen);
}
- if (isSemaphoreAsyncNumAquired) {
+ if (isSemaphoreAsyncNumbAcquired) {
semaphoreAsyncSendNum.release();
}
sendCallback.onSuccess(sendResult);
@@ -592,10 +592,10 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
@Override
public void onException(Throwable e) {
- if (isSemaphoreAsyncSizeAquired) {
+ if (isSemaphoreAsyncSizeAcquired) {
semaphoreAsyncSendSize.release(msgLen);
}
- if (isSemaphoreAsyncNumAquired) {
+ if (isSemaphoreAsyncNumbAcquired) {
semaphoreAsyncSendNum.release();
}
sendCallback.onException(e);
@@ -607,31 +607,31 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode =
this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
- boolean isSemaphoreAsyncNumAquired = false;
- boolean isSemaphoreAsyncSizeAquired = false;
+ boolean isSemaphoreAsyncNumbAcquired = false;
+ boolean isSemaphoreAsyncSizeAcquired = false;
int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
try {
if (isEnableBackpressureForAsyncMode) {
long costTime = System.currentTimeMillis() - beginStartTime;
- isSemaphoreAsyncNumAquired = timeout - costTime > 0
+ isSemaphoreAsyncNumbAcquired = timeout - costTime > 0
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime,
TimeUnit.MILLISECONDS);
- if (!isSemaphoreAsyncNumAquired) {
+ if (!isSemaphoreAsyncNumbAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message
tryAcquire semaphoreAsyncNum timeout"));
return;
}
costTime = System.currentTimeMillis() - beginStartTime;
- isSemaphoreAsyncSizeAquired = timeout - costTime > 0
+ isSemaphoreAsyncSizeAcquired = timeout - costTime > 0
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout -
costTime, TimeUnit.MILLISECONDS);
- if (!isSemaphoreAsyncSizeAquired) {
+ if (!isSemaphoreAsyncSizeAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message
tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
- sendCallback.isSemaphoreAsyncSizeAquired =
isSemaphoreAsyncSizeAquired;
- sendCallback.isSemaphoreAsyncNumAquired =
isSemaphoreAsyncNumAquired;
+ sendCallback.isSemaphoreAsyncSizeAcquired =
isSemaphoreAsyncSizeAcquired;
+ sendCallback.isSemaphoreAsyncNumbAcquired =
isSemaphoreAsyncNumbAcquired;
sendCallback.msgLen = msgLen;
executor.submit(runnable);
} catch (RejectedExecutionException e) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index f629fe44a8..db8bbd66ef 100644
---
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -55,6 +55,7 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
this.serviceDetector = serviceDetector;
}
+ @Override
public void detectByOneRound() {
for (Map.Entry<String, FaultItem> item :
this.faultItemTable.entrySet()) {
FaultItem brokerItem = item.getValue();
@@ -77,6 +78,7 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
}
}
+ @Override
public void startDetector() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -92,6 +94,7 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
}, 3, 3, TimeUnit.SECONDS);
}
+ @Override
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
@@ -128,6 +131,7 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
return true;
}
+ @Override
public boolean isReachable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
@@ -141,10 +145,12 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
this.faultItemTable.remove(name);
}
+ @Override
public boolean isStartDetectorEnable() {
return startDetectorEnable;
}
+ @Override
public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
}
@@ -177,10 +183,12 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
'}';
}
+ @Override
public void setDetectTimeout(final int detectTimeout) {
this.detectTimeout = detectTimeout;
}
+ @Override
public void setDetectInterval(final int detectInterval) {
this.detectInterval = detectInterval;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index d44f22616f..1fe19773a5 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -153,6 +153,7 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
this.namespaceV2 = namespaceV2;
}
+ @Override
public void start(String nameSrvAddr, AccessChannel accessChannel) throws
MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
@@ -330,7 +331,7 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
private int currentMsgKeySize;
private final String traceTopicName;
private final String regionId;
- private final List<TraceTransferBean> traceTransferBeanList = new
ArrayList();
+ private final List<TraceTransferBean> traceTransferBeanList = new
ArrayList<>();
TraceDataSegment(String traceTopicName, String regionId) {
this.traceTopicName = traceTopicName;
@@ -345,7 +346,7 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
.reduce(currentMsgKeySize, (acc, x) -> acc + x.length(),
Integer::sum);
if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 *
1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
- List<TraceTransferBean> dataToSend = new
ArrayList(traceTransferBeanList);
+ List<TraceTransferBean> dataToSend = new
ArrayList<>(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);
this.clear();
@@ -356,7 +357,7 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
if (this.traceTransferBeanList.isEmpty()) {
return;
}
- List<TraceTransferBean> dataToSend = new
ArrayList(traceTransferBeanList);
+ List<TraceTransferBean> dataToSend = new
ArrayList<>(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);