This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit fa584b4cf30dedfaab4ceed6be47412e8f0d8c61
Author: CMonkey <[email protected]>
AuthorDate: Wed Aug 14 10:58:11 2019 +0800

    SCB-1418 add ackMode poolTimeout to KafkaChannelAutoConfiguration
---
 .../alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
 
b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index 6a451ec..a0793d1 100644
--- 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ 
b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -37,6 +37,7 @@ import 
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.core.*;
 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
@@ -79,6 +80,12 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")
     private int autoCommitIntervalMs;
 
+    @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}")
+    private String ackMode;
+
+    @Value("${spring.kafka.listener.pollTimeout:1500}")
+    private long poolTimeout;
+
     @Bean
     @ConditionalOnMissingBean
     public ProducerFactory<String, Object> producerFactory(){
@@ -125,7 +132,8 @@ public class KafkaChannelAutoConfiguration {
         ConcurrentKafkaListenerContainerFactory<String,Object> 
concurrentKafkaListenerContainerFactory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
-        
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(1500L);
+        
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(poolTimeout);
+        
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(ackMode));
 
         return concurrentKafkaListenerContainerFactory;
     }

Reply via email to