This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 08b9d087cbb454bb11c534ab33073069115c12bf
Author: wangkai <[email protected]>
AuthorDate: Fri Nov 3 16:59:53 2023 +0800

    [e2eTest] add e2eTest model
---
 .../adapter/storage/rocketmq/impl/DefaultSendCallback.java            | 4 ++++
 .../adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java    | 3 +++
 2 files changed, 7 insertions(+)

diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java
index 35bc7ef..d1df650 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java
@@ -17,12 +17,14 @@
 
 package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback;
 import 
org.apache.rocketmq.eventbridge.domain.model.data.PutEventsResponseEntry;
 import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
 
+@Slf4j
 public class DefaultSendCallback implements SendCallback {
 
     PutEventCallback putEventCallback;
@@ -35,6 +37,7 @@ public class DefaultSendCallback implements SendCallback {
 
     @Override
     public void onSuccess(SendResult sendResult) {
+        log.info("send msg to topic :{} success result.", sendResult);
         entry.setEventId(sendResult.getMsgId());
         entry.setErrorCode(DefaultErrorCode.Success.getCode());
         putEventCallback.endProcess(entry);
@@ -42,6 +45,7 @@ public class DefaultSendCallback implements SendCallback {
 
     @Override
     public void onException(Throwable throwable) {
+        log.error("send msg to topic : fail result.", throwable);
         entry.setErrorCode(DefaultErrorCode.InternalError.getCode());
         entry.setErrorMessage(throwable.getMessage());
         putEventCallback.endProcess(entry);
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
index 5b1d796..bf24411 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
@@ -75,8 +75,11 @@ public class RocketMQEventDataRepository implements 
EventDataRepository {
         String topicName = this.getTopicName(accountId, eventBusName);
         Message msg = eventDataOnRocketMQConnectAPI.converter(accountId, 
topicName, eventBridgeEvent);
         try {
+            log.info("send msg to topic :{} before.", topicName);
             producer.send(msg, new DefaultSendCallback(putEventCallback), 
1000L);
+            log.info("send msg to topic :{} end.", topicName);
         } catch (Throwable e) {
+            log.info("send msg to topic :{} failed. and exception is {}", 
topicName, e.toString());
             throw new EventBridgeException(EventBridgeErrorCode.InternalError, 
e);
         }
         return true;

Reply via email to