This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 40099b16f0 [INLONG-8871][TubeMQ] Use an error code in 
checkMessageAndStatus() to return the check result instead of throwing an 
exception (#8872)
40099b16f0 is described below

commit 40099b16f0b275a9e8ce19ce8cb81db369713dd9
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Sep 11 14:10:42 2023 +0800

    [INLONG-8871][TubeMQ] Use an error code in checkMessageAndStatus() to 
return the check result instead of throwing an exception (#8872)
---
 .../tubemq/client/producer/MessageSentResult.java  |  7 +++
 .../client/producer/SimpleMessageProducer.java     | 53 ++++++++++++++--------
 .../inlong/tubemq/corebase/TErrCodeConstants.java  |  6 +++
 3 files changed, 48 insertions(+), 18 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
index 8a7b042b89..c5d1a580f4 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageSentResult.java
@@ -32,6 +32,13 @@ public class MessageSentResult {
     private long appendTime = TBaseConstants.META_VALUE_UNDEFINED;
     private long appendOffset = TBaseConstants.META_VALUE_UNDEFINED;
 
+    public MessageSentResult(Message message, boolean success, int errCode, 
String errMsg) {
+        this.message = message;
+        this.success = success;
+        this.errCode = errCode;
+        this.errMsg = errMsg;
+    }
+
     public MessageSentResult(boolean success, int errCode, String errMsg,
             Message message, long messageId, Partition partition) {
         this.success = success;
diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index e001f65536..8ac154cf4d 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -210,7 +210,10 @@ public class SimpleMessageProducer implements 
MessageProducer {
     @Override
     public MessageSentResult sendMessage(final Message message)
             throws TubeClientException, InterruptedException {
-        checkMessageAndStatus(message);
+        MessageSentResult result = checkMessageAndStatus(message);
+        if (!result.isSuccess()) {
+            return result;
+        }
         Partition partition = this.selectPartition(message, 
BrokerWriteService.class);
         int brokerId = partition.getBrokerId();
         long startTime = System.currentTimeMillis();
@@ -243,7 +246,11 @@ public class SimpleMessageProducer implements 
MessageProducer {
     @Override
     public void sendMessage(final Message message, final MessageSentCallback 
cb) throws TubeClientException,
             InterruptedException {
-        checkMessageAndStatus(message);
+        MessageSentResult result = checkMessageAndStatus(message);
+        if (!result.isSuccess()) {
+            cb.onMessageSent(result);
+            return;
+        }
         final Partition partition =
                 this.selectPartition(message, 
BrokerWriteService.AsyncService.class);
         final int brokerId = partition.getBrokerId();
@@ -297,40 +304,50 @@ public class SimpleMessageProducer implements 
MessageProducer {
         }
     }
 
-    private void checkMessageAndStatus(final Message message) throws 
TubeClientException {
+    private MessageSentResult checkMessageAndStatus(final Message message) {
         if (message == null) {
-            throw new TubeClientException("Illegal parameter: null message 
package!");
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.PARAMETER_MSG_NULL, "Illegal parameter: 
null message package!");
         }
         if (TStringUtils.isBlank(message.getTopic())) {
-            throw new TubeClientException("Illegal parameter: blank topic in 
message package!");
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.PARAMETER_MSG_TOPIC_BLANK, "Illegal 
parameter: blank topic in message package!");
         }
         if ((message.getData() == null)
                 || (message.getData().length == 0)) {
-            throw new TubeClientException("Illegal parameter: null data in 
message package!");
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.PARAMETER_MSG_BODY_EMPTY, "Illegal 
parameter: null data in message package!");
         }
         if (this.publishTopicMap.get(message.getTopic()) == null) {
-            throw new TubeClientException(new StringBuilder(512)
-                    .append("Topic ").append(message.getTopic())
-                    .append(" not publish, please publish first!").toString());
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.PARAMETER_MSG_TOPIC_UNPUBLISHED,
+                    new StringBuilder(512).append("Topic 
").append(message.getTopic())
+                            .append(" not publish, please publish 
first!").toString());
         }
         if (this.producerManager.getTopicPartition(message.getTopic()) == 
null) {
-            throw new TubeClientException(new StringBuilder(512)
-                    .append("Topic ").append(message.getTopic())
-                    .append(" not publish, make sure the topic exist or 
acceptPublish and try later!").toString());
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.PARAMETER_MSG_TOPIC_NO_PARTITION,
+                    new StringBuilder(512).append("Topic 
").append(message.getTopic())
+                            .append(" not publish, make sure the topic exist 
or acceptPublish and try later!")
+                            .toString());
         }
         int msgSize = TStringUtils.isBlank(message.getAttribute())
                 ? message.getData().length
                 : (message.getData().length + message.getAttribute().length());
         if (msgSize > producerManager.getMaxMsgSize(message.getTopic())) {
-            throw new TubeClientException(new StringBuilder(512)
-                    .append("Illegal parameter: over max message length for 
the total size of")
-                    .append(" message data and attribute, allowed size is ")
-                    .append(producerManager.getMaxMsgSize(message.getTopic()))
-                    .append(", message's real size is 
").append(msgSize).toString());
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.PARAMETER_MSG_OVER_MAX_LENGTH,
+                    new StringBuilder(512)
+                            .append("Illegal parameter: over max message 
length for the total size of")
+                            .append(" message data and attribute, allowed size 
is ")
+                            
.append(producerManager.getMaxMsgSize(message.getTopic()))
+                            .append(", message's real size is 
").append(msgSize).toString());
         }
         if (isShutDown.get()) {
-            throw new TubeClientException("Status error: producer has been 
shutdown!");
+            return new MessageSentResult(message, false,
+                    TErrCodeConstants.CLIENT_SHUTDOWN, "Status error: producer 
has been shutdown!");
         }
+        return new MessageSentResult(message, true, TErrCodeConstants.SUCCESS, 
"Ok");
     }
 
     private ClientBroker.SendMessageRequestP2B 
createSendMessageRequest(Partition partition,
diff --git 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
index 0a0eb5c1f1..4697bc5f28 100644
--- 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
+++ 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
@@ -52,6 +52,12 @@ public class TErrCodeConstants {
     public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
     public static final int CLIENT_DUPLICATE_INDEXID = 430;
     public static final int TOPIC_NOT_DEPLOYED = 431;
+    public static final int PARAMETER_MSG_NULL = 440;
+    public static final int PARAMETER_MSG_TOPIC_BLANK = 441;
+    public static final int PARAMETER_MSG_BODY_EMPTY = 442;
+    public static final int PARAMETER_MSG_TOPIC_UNPUBLISHED = 443;
+    public static final int PARAMETER_MSG_TOPIC_NO_PARTITION = 444;
+    public static final int PARAMETER_MSG_OVER_MAX_LENGTH = 445;
 
     public static final int CONSUME_GROUP_FORBIDDEN = 450;
     public static final int SERVER_CONSUME_SPEED_LIMIT = 452;

Reply via email to