This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit fa584b4cf30dedfaab4ceed6be47412e8f0d8c61 Author: CMonkey <[email protected]> AuthorDate: Wed Aug 14 10:58:11 2019 +0800 SCB-1418 add ackMode poolTimeout to KafkaChannelAutoConfiguration --- .../alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java index 6a451ec..a0793d1 100644 --- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java +++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java @@ -37,6 +37,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; @@ -79,6 +80,12 @@ public class KafkaChannelAutoConfiguration { @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}") private int autoCommitIntervalMs; + @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}") + private String ackMode; + + @Value("${spring.kafka.listener.pollTimeout:1500}") + private long poolTimeout; + @Bean @ConditionalOnMissingBean public ProducerFactory<String, Object> producerFactory(){ @@ -125,7 +132,8 @@ public class KafkaChannelAutoConfiguration { ConcurrentKafkaListenerContainerFactory<String,Object> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); - concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(1500L); + concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(poolTimeout); + concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(ackMode)); return concurrentKafkaListenerContainerFactory; }
