This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 08b9d087cbb454bb11c534ab33073069115c12bf Author: wangkai <[email protected]> AuthorDate: Fri Nov 3 16:59:53 2023 +0800 [e2eTest] add e2eTest model --- .../adapter/storage/rocketmq/impl/DefaultSendCallback.java | 4 ++++ .../adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java | 3 +++ 2 files changed, 7 insertions(+) diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java index 35bc7ef..d1df650 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java @@ -17,12 +17,14 @@ package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl; +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback; import org.apache.rocketmq.eventbridge.domain.model.data.PutEventsResponseEntry; import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; +@Slf4j public class DefaultSendCallback implements SendCallback { PutEventCallback putEventCallback; @@ -35,6 +37,7 @@ public class DefaultSendCallback implements SendCallback { @Override public void onSuccess(SendResult sendResult) { + log.info("send msg to topic :{} success result.", sendResult); entry.setEventId(sendResult.getMsgId()); entry.setErrorCode(DefaultErrorCode.Success.getCode()); putEventCallback.endProcess(entry); @@ -42,6 +45,7 @@ public class DefaultSendCallback implements SendCallback { @Override public void onException(Throwable throwable) { + log.error("send msg to topic : fail result.", throwable); entry.setErrorCode(DefaultErrorCode.InternalError.getCode()); entry.setErrorMessage(throwable.getMessage()); putEventCallback.endProcess(entry); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java index 5b1d796..bf24411 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java @@ -75,8 +75,11 @@ public class RocketMQEventDataRepository implements EventDataRepository { String topicName = this.getTopicName(accountId, eventBusName); Message msg = eventDataOnRocketMQConnectAPI.converter(accountId, topicName, eventBridgeEvent); try { + log.info("send msg to topic :{} before.", topicName); producer.send(msg, new DefaultSendCallback(putEventCallback), 1000L); + log.info("send msg to topic :{} end.", topicName); } catch (Throwable e) { + log.info("send msg to topic :{} failed. and exception is {}", topicName, e.toString()); throw new EventBridgeException(EventBridgeErrorCode.InternalError, e); } return true;
