This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 8a07466ab1e5b37cc8e6ede1522c3d6308c90013
Author: CMonkey <42.codemon...@gmail.com>
AuthorDate: Wed Aug 14 10:37:20 2019 +0800

    SCB-1418 change KafkaMessagePublisher send mode , future mode to sync mode
---
 .../fsm/channel/kafka/KafkaMessagePublisher.java   | 28 +++++++---------------
 1 file changed, 8 insertions(+), 20 deletions(-)

diff --git 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
 
b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index 4b1e511..068f782 100644
--- 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ 
b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -21,9 +21,8 @@ import 
org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import java.util.concurrent.ExecutionException;
 
 public class KafkaMessagePublisher implements MessagePublisher {
 
@@ -42,23 +41,12 @@ public class KafkaMessagePublisher implements 
MessagePublisher {
         if(logger.isDebugEnabled()){
             logger.debug("send message [{}] to [{}]", data, topic);
         }
-        ListenableFuture<SendResult<String, Object>> listenableFuture = 
kafkaTemplate.send(topic, data);
-
-        listenableFuture.addCallback(new 
ListenableFutureCallback<SendResult<String, Object>>() {
-            @Override
-            public void onFailure(Throwable throwable) {
 
-                if(logger.isDebugEnabled()){
-                    logger.debug("send message failure [{}]", 
throwable.getMessage(), throwable);
-                }
-            }
-
-            @Override
-            public void onSuccess(SendResult<String, Object> result) {
-                if(logger.isDebugEnabled()){
-                    logger.debug("send success result offset = [{}]", 
result.getRecordMetadata().offset());
-                }
-            }
-        });
+        try {
+            kafkaTemplate.send(topic, data).get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("publish Exception = [{}]", e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
     }
 }
\ No newline at end of file

Reply via email to