This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 20b3a9275 [ISSUE #4326] Optimized some code style.
20b3a9275 is described below
commit 20b3a9275cefa0b2cadf05fe9275339b072f9802
Author: Oliver <[email protected]>
AuthorDate: Thu May 19 14:27:41 2022 +0800
[ISSUE #4326] Optimized some code style.
[ISSUE #4326] Optimized some code style.
---
.../apache/rocketmq/client/consumer/DefaultLitePullConsumer.java | 2 ++
.../org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java | 2 ++
.../org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java | 6 ++++--
.../rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java | 6 ++++--
.../org/apache/rocketmq/client/producer/DefaultMQProducer.java | 7 ++-----
.../src/main/java/org/apache/rocketmq/common/CountDownLatch2.java | 2 ++
6 files changed, 16 insertions(+), 9 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 74d6f3455..440e37e49 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -447,10 +447,12 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
this.offsetStore = offsetStore;
}
+ @Override
public boolean isUnitMode() {
return unitMode;
}
+ @Override
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 1c3f3da4d..4784e72e1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -434,10 +434,12 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
return defaultMQPullConsumerImpl;
}
+ @Override
public boolean isUnitMode() {
return unitMode;
}
+ @Override
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 8e1c8d15a..b9a82e010 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -185,7 +185,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
* Consider the {@code pullBatchSize}, the instantaneous value may exceed
the limit
*
* <p>
- * The size of a message only measured by message body, so it's not
accurate
+ * The size(MB) of a message only measured by message body, so it's not
accurate
*/
private int pullThresholdSizeForQueue = 100;
@@ -417,7 +417,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
- if (traceDispatcher != null && traceDispatcher instanceof
AsyncTraceDispatcher) {
+ if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher)
traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}
@@ -860,10 +860,12 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
this.postSubscriptionWhenPull = postSubscriptionWhenPull;
}
+ @Override
public boolean isUnitMode() {
return unitMode;
}
+ @Override
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index eed5fa43f..7319fdad7 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -367,8 +367,10 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
} catch (Exception e) {
log.error("parse subscription error", e);
}
- ms.setSubVersion(0L);
- result.add(ms);
+ if (ms != null) {
+ ms.setSubVersion(0L);
+ result.add(ms);
+ }
}
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 02287dd53..0d17d4db8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -38,9 +38,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -264,7 +262,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
- if (traceDispatcher != null && traceDispatcher instanceof
AsyncTraceDispatcher) {
+ if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher)
traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}
@@ -896,9 +894,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
try {
- MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
- } catch (Exception e) {
+ } catch (Exception ignored) {
}
return
this.defaultMQProducerImpl.queryMessageByUniqKey(withNamespace(topic), msgId);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java
b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java
index 9c95fff7b..4e43e5ce3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java
@@ -172,10 +172,12 @@ public class CountDownLatch2 {
return getState();
}
+ @Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
+ @Override
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {