This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 40099b16f0 [INLONG-8871][TubeMQ] Use an error code in
checkMessageAndStatus() to return the check result instead of throwing an
exception (#8872)
40099b16f0 is described below
commit 40099b16f0b275a9e8ce19ce8cb81db369713dd9
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Sep 11 14:10:42 2023 +0800
[INLONG-8871][TubeMQ] Use an error code in checkMessageAndStatus() to
return the check result instead of throwing an exception (#8872)
---
.../tubemq/client/producer/MessageSentResult.java | 7 +++
.../client/producer/SimpleMessageProducer.java | 53 ++++++++++++++--------
.../inlong/tubemq/corebase/TErrCodeConstants.java | 6 +++
3 files changed, 48 insertions(+), 18 deletions(-)
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
index 8a7b042b89..c5d1a580f4 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
@@ -32,6 +32,13 @@ public class MessageSentResult {
private long appendTime = TBaseConstants.META_VALUE_UNDEFINED;
private long appendOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ public MessageSentResult(Message message, boolean success, int errCode,
String errMsg) {
+ this.message = message;
+ this.success = success;
+ this.errCode = errCode;
+ this.errMsg = errMsg;
+ }
+
public MessageSentResult(boolean success, int errCode, String errMsg,
Message message, long messageId, Partition partition) {
this.success = success;
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index e001f65536..8ac154cf4d 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -210,7 +210,10 @@ public class SimpleMessageProducer implements
MessageProducer {
@Override
public MessageSentResult sendMessage(final Message message)
throws TubeClientException, InterruptedException {
- checkMessageAndStatus(message);
+ MessageSentResult result = checkMessageAndStatus(message);
+ if (!result.isSuccess()) {
+ return result;
+ }
Partition partition = this.selectPartition(message,
BrokerWriteService.class);
int brokerId = partition.getBrokerId();
long startTime = System.currentTimeMillis();
@@ -243,7 +246,11 @@ public class SimpleMessageProducer implements
MessageProducer {
@Override
public void sendMessage(final Message message, final MessageSentCallback
cb) throws TubeClientException,
InterruptedException {
- checkMessageAndStatus(message);
+ MessageSentResult result = checkMessageAndStatus(message);
+ if (!result.isSuccess()) {
+ cb.onMessageSent(result);
+ return;
+ }
final Partition partition =
this.selectPartition(message,
BrokerWriteService.AsyncService.class);
final int brokerId = partition.getBrokerId();
@@ -297,40 +304,50 @@ public class SimpleMessageProducer implements
MessageProducer {
}
}
- private void checkMessageAndStatus(final Message message) throws
TubeClientException {
+ private MessageSentResult checkMessageAndStatus(final Message message) {
if (message == null) {
- throw new TubeClientException("Illegal parameter: null message
package!");
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.PARAMETER_MSG_NULL, "Illegal parameter:
null message package!");
}
if (TStringUtils.isBlank(message.getTopic())) {
- throw new TubeClientException("Illegal parameter: blank topic in
message package!");
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.PARAMETER_MSG_TOPIC_BLANK, "Illegal
parameter: blank topic in message package!");
}
if ((message.getData() == null)
|| (message.getData().length == 0)) {
- throw new TubeClientException("Illegal parameter: null data in
message package!");
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.PARAMETER_MSG_BODY_EMPTY, "Illegal
parameter: null data in message package!");
}
if (this.publishTopicMap.get(message.getTopic()) == null) {
- throw new TubeClientException(new StringBuilder(512)
- .append("Topic ").append(message.getTopic())
- .append(" not publish, please publish first!").toString());
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.PARAMETER_MSG_TOPIC_UNPUBLISHED,
+ new StringBuilder(512).append("Topic
").append(message.getTopic())
+ .append(" not publish, please publish
first!").toString());
}
if (this.producerManager.getTopicPartition(message.getTopic()) ==
null) {
- throw new TubeClientException(new StringBuilder(512)
- .append("Topic ").append(message.getTopic())
- .append(" not publish, make sure the topic exist or
acceptPublish and try later!").toString());
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.PARAMETER_MSG_TOPIC_NO_PARTITION,
+ new StringBuilder(512).append("Topic
").append(message.getTopic())
+ .append(" not publish, make sure the topic exist
or acceptPublish and try later!")
+ .toString());
}
int msgSize = TStringUtils.isBlank(message.getAttribute())
? message.getData().length
: (message.getData().length + message.getAttribute().length());
if (msgSize > producerManager.getMaxMsgSize(message.getTopic())) {
- throw new TubeClientException(new StringBuilder(512)
- .append("Illegal parameter: over max message length for
the total size of")
- .append(" message data and attribute, allowed size is ")
- .append(producerManager.getMaxMsgSize(message.getTopic()))
- .append(", message's real size is
").append(msgSize).toString());
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.PARAMETER_MSG_OVER_MAX_LENGTH,
+ new StringBuilder(512)
+ .append("Illegal parameter: over max message
length for the total size of")
+ .append(" message data and attribute, allowed size
is ")
+
.append(producerManager.getMaxMsgSize(message.getTopic()))
+ .append(", message's real size is
").append(msgSize).toString());
}
if (isShutDown.get()) {
- throw new TubeClientException("Status error: producer has been
shutdown!");
+ return new MessageSentResult(message, false,
+ TErrCodeConstants.CLIENT_SHUTDOWN, "Status error: producer
has been shutdown!");
}
+ return new MessageSentResult(message, true, TErrCodeConstants.SUCCESS,
"Ok");
}
private ClientBroker.SendMessageRequestP2B
createSendMessageRequest(Partition partition,
diff --git
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
index 0a0eb5c1f1..4697bc5f28 100644
---
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
+++
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
@@ -52,6 +52,12 @@ public class TErrCodeConstants {
public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
public static final int CLIENT_DUPLICATE_INDEXID = 430;
public static final int TOPIC_NOT_DEPLOYED = 431;
+ public static final int PARAMETER_MSG_NULL = 440;
+ public static final int PARAMETER_MSG_TOPIC_BLANK = 441;
+ public static final int PARAMETER_MSG_BODY_EMPTY = 442;
+ public static final int PARAMETER_MSG_TOPIC_UNPUBLISHED = 443;
+ public static final int PARAMETER_MSG_TOPIC_NO_PARTITION = 444;
+ public static final int PARAMETER_MSG_OVER_MAX_LENGTH = 445;
public static final int CONSUME_GROUP_FORBIDDEN = 450;
public static final int SERVER_CONSUME_SPEED_LIMIT = 452;