This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 8a07466ab1e5b37cc8e6ede1522c3d6308c90013 Author: CMonkey <42.codemon...@gmail.com> AuthorDate: Wed Aug 14 10:37:20 2019 +0800 SCB-1418 change KafkaMessagePublisher send mode , future mode to sync mode --- .../fsm/channel/kafka/KafkaMessagePublisher.java | 28 +++++++--------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java index 4b1e511..068f782 100644 --- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java +++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java @@ -21,9 +21,8 @@ import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.util.concurrent.ExecutionException; public class KafkaMessagePublisher implements MessagePublisher { @@ -42,23 +41,12 @@ public class KafkaMessagePublisher implements MessagePublisher { if(logger.isDebugEnabled()){ logger.debug("send message [{}] to [{}]", data, topic); } - ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, data); - - listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { - @Override - public void onFailure(Throwable throwable) { - if(logger.isDebugEnabled()){ - logger.debug("send message failure [{}]", throwable.getMessage(), throwable); - } - } - - @Override - public void onSuccess(SendResult<String, Object> result) { - if(logger.isDebugEnabled()){ - logger.debug("send success result offset = [{}]", result.getRecordMetadata().offset()); - } - } - }); + try { + kafkaTemplate.send(topic, data).get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("publish Exception = [{}]", e.getMessage(), e); + throw new RuntimeException(e); + } } } \ No newline at end of file