This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8150e13a29 [ISSUE #8145] Optimize some code style in client module 
(#8146)
8150e13a29 is described below

commit 8150e13a29f9d5ea7bf8814960389837650164af
Author: Willhow <[email protected]>
AuthorDate: Thu May 16 09:50:25 2024 +0800

    [ISSUE #8145] Optimize some code style in client module (#8146)
---
 .../consumer/ConsumeMessageOrderlyService.java     |  8 ++++---
 .../client/impl/consumer/RebalancePushImpl.java    |  4 ----
 .../client/impl/factory/MQClientInstance.java      |  6 ++---
 .../impl/producer/DefaultMQProducerImpl.java       | 28 +++++++++++-----------
 .../client/latency/LatencyFaultToleranceImpl.java  |  8 +++++++
 .../client/trace/AsyncTraceDispatcher.java         |  7 +++---
 6 files changed, 34 insertions(+), 27 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 36d686048c..3ca465da70 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -85,6 +85,7 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
         this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
     }
 
+    @Override
     public void start() {
         if 
(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()))
 {
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -96,10 +97,11 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
                         log.error("scheduleAtFixedRate lockMQPeriodically 
exception", e);
                     }
                 }
-            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, 
TimeUnit.MILLISECONDS);
+            }, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, 
TimeUnit.MILLISECONDS);
         }
     }
 
+    @Override
     public void shutdown(long awaitTerminateMillis) {
         this.stopped = true;
         this.scheduledExecutorService.shutdown();
@@ -201,8 +203,8 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
         final List<MessageExt> msgs,
         final ProcessQueue processQueue,
         final MessageQueue messageQueue,
-        final boolean dispathToConsume) {
-        if (dispathToConsume) {
+        final boolean dispatchToConsume) {
+        if (dispatchToConsume) {
             ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, 
messageQueue);
             this.consumeExecutor.submit(consumeRequest);
         }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 59e087c0e0..fe2f19b2f9 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -140,10 +140,6 @@ public class RebalancePushImpl extends RebalanceImpl {
         return 
defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || 
defaultMQPushConsumerImpl.isConsumeOrderly() || 
MessageModel.BROADCASTING.equals(messageModel);
     }
 
-    public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, 
final PopProcessQueue pq) {
-        return true;
-    }
-
     @Override
     public ConsumeType consumeType() {
         return ConsumeType.CONSUME_PASSIVELY;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index f964869ac2..1ff35a00d1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -125,8 +125,8 @@ public class MQClientInstance {
     private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable 
= new ConcurrentHashMap<>();
 
     private final ConcurrentMap<String/* Broker Name */, HashMap<String/* 
address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
-    private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet 
= new HashSet();
-    private final ConcurrentMap<String, Integer> 
brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap();
+    private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet 
= new HashSet<>();
+    private final ConcurrentMap<String, Integer> 
brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>();
     private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, 
"MQClientFactoryScheduledThread"));
     private final ScheduledExecutorService fetchRemoteConfigExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         @Override
@@ -1161,7 +1161,7 @@ public class MQClientInstance {
                 Entry<Long, String> entry = map.entrySet().iterator().next();
                 brokerAddr = entry.getValue();
                 slave = entry.getKey() != MixAll.MASTER_ID;
-                found = true;
+                found = brokerAddr != null;
             }
         }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 5b7bd2dc9d..6268bcc0a1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -570,8 +570,8 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     }
 
     class BackpressureSendCallBack implements SendCallback {
-        public boolean isSemaphoreAsyncSizeAquired = false;
-        public boolean isSemaphoreAsyncNumAquired = false;
+        public boolean isSemaphoreAsyncSizeAcquired = false;
+        public boolean isSemaphoreAsyncNumbAcquired = false;
         public int msgLen;
         private final SendCallback sendCallback;
 
@@ -581,10 +581,10 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
 
         @Override
         public void onSuccess(SendResult sendResult) {
-            if (isSemaphoreAsyncSizeAquired) {
+            if (isSemaphoreAsyncSizeAcquired) {
                 semaphoreAsyncSendSize.release(msgLen);
             }
-            if (isSemaphoreAsyncNumAquired) {
+            if (isSemaphoreAsyncNumbAcquired) {
                 semaphoreAsyncSendNum.release();
             }
             sendCallback.onSuccess(sendResult);
@@ -592,10 +592,10 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
 
         @Override
         public void onException(Throwable e) {
-            if (isSemaphoreAsyncSizeAquired) {
+            if (isSemaphoreAsyncSizeAcquired) {
                 semaphoreAsyncSendSize.release(msgLen);
             }
-            if (isSemaphoreAsyncNumAquired) {
+            if (isSemaphoreAsyncNumbAcquired) {
                 semaphoreAsyncSendNum.release();
             }
             sendCallback.onException(e);
@@ -607,31 +607,31 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
         throws MQClientException, InterruptedException {
         ExecutorService executor = this.getAsyncSenderExecutor();
         boolean isEnableBackpressureForAsyncMode = 
this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
-        boolean isSemaphoreAsyncNumAquired = false;
-        boolean isSemaphoreAsyncSizeAquired = false;
+        boolean isSemaphoreAsyncNumbAcquired = false;
+        boolean isSemaphoreAsyncSizeAcquired = false;
         int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
 
         try {
             if (isEnableBackpressureForAsyncMode) {
                 long costTime = System.currentTimeMillis() - beginStartTime;
-                isSemaphoreAsyncNumAquired = timeout - costTime > 0
+                isSemaphoreAsyncNumbAcquired = timeout - costTime > 0
                     && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, 
TimeUnit.MILLISECONDS);
-                if (!isSemaphoreAsyncNumAquired) {
+                if (!isSemaphoreAsyncNumbAcquired) {
                     sendCallback.onException(
                         new RemotingTooMuchRequestException("send message 
tryAcquire semaphoreAsyncNum timeout"));
                     return;
                 }
                 costTime = System.currentTimeMillis() - beginStartTime;
-                isSemaphoreAsyncSizeAquired = timeout - costTime > 0
+                isSemaphoreAsyncSizeAcquired = timeout - costTime > 0
                     && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - 
costTime, TimeUnit.MILLISECONDS);
-                if (!isSemaphoreAsyncSizeAquired) {
+                if (!isSemaphoreAsyncSizeAcquired) {
                     sendCallback.onException(
                         new RemotingTooMuchRequestException("send message 
tryAcquire semaphoreAsyncSize timeout"));
                     return;
                 }
             }
-            sendCallback.isSemaphoreAsyncSizeAquired = 
isSemaphoreAsyncSizeAquired;
-            sendCallback.isSemaphoreAsyncNumAquired = 
isSemaphoreAsyncNumAquired;
+            sendCallback.isSemaphoreAsyncSizeAcquired = 
isSemaphoreAsyncSizeAcquired;
+            sendCallback.isSemaphoreAsyncNumbAcquired = 
isSemaphoreAsyncNumbAcquired;
             sendCallback.msgLen = msgLen;
             executor.submit(runnable);
         } catch (RejectedExecutionException e) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index f629fe44a8..db8bbd66ef 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -55,6 +55,7 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
         this.serviceDetector = serviceDetector;
     }
 
+    @Override
     public void detectByOneRound() {
         for (Map.Entry<String, FaultItem> item : 
this.faultItemTable.entrySet()) {
             FaultItem brokerItem = item.getValue();
@@ -77,6 +78,7 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
         }
     }
 
+    @Override
     public void startDetector() {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
@@ -92,6 +94,7 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
         }, 3, 3, TimeUnit.SECONDS);
     }
 
+    @Override
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
     }
@@ -128,6 +131,7 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
         return true;
     }
 
+    @Override
     public boolean isReachable(final String name) {
         final FaultItem faultItem = this.faultItemTable.get(name);
         if (faultItem != null) {
@@ -141,10 +145,12 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
         this.faultItemTable.remove(name);
     }
 
+    @Override
     public boolean isStartDetectorEnable() {
         return startDetectorEnable;
     }
 
+    @Override
     public void setStartDetectorEnable(boolean startDetectorEnable) {
         this.startDetectorEnable = startDetectorEnable;
     }
@@ -177,10 +183,12 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
                 '}';
     }
 
+    @Override
     public void setDetectTimeout(final int detectTimeout) {
         this.detectTimeout = detectTimeout;
     }
 
+    @Override
     public void setDetectInterval(final int detectInterval) {
         this.detectInterval = detectInterval;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index d44f22616f..1fe19773a5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -153,6 +153,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         this.namespaceV2 = namespaceV2;
     }
 
+    @Override
     public void start(String nameSrvAddr, AccessChannel accessChannel) throws 
MQClientException {
         if (isStarted.compareAndSet(false, true)) {
             traceProducer.setNamesrvAddr(nameSrvAddr);
@@ -330,7 +331,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         private int currentMsgKeySize;
         private final String traceTopicName;
         private final String regionId;
-        private final List<TraceTransferBean> traceTransferBeanList = new 
ArrayList();
+        private final List<TraceTransferBean> traceTransferBeanList = new 
ArrayList<>();
 
         TraceDataSegment(String traceTopicName, String regionId) {
             this.traceTopicName = traceTopicName;
@@ -345,7 +346,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
             this.currentMsgKeySize = traceTransferBean.getTransKey().stream()
                 .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(), 
Integer::sum);
             if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 
1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) {
-                List<TraceTransferBean> dataToSend = new 
ArrayList(traceTransferBeanList);
+                List<TraceTransferBean> dataToSend = new 
ArrayList<>(traceTransferBeanList);
                 AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
                 traceExecutor.submit(asyncDataSendTask);
                 this.clear();
@@ -356,7 +357,7 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
             if (this.traceTransferBeanList.isEmpty()) {
                 return;
             }
-            List<TraceTransferBean> dataToSend = new 
ArrayList(traceTransferBeanList);
+            List<TraceTransferBean> dataToSend = new 
ArrayList<>(traceTransferBeanList);
             AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(traceTopicName, regionId, dataToSend);
             traceExecutor.submit(asyncDataSendTask);
 

Reply via email to