[
https://issues.apache.org/jira/browse/CAMEL-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yogananth mahalingam updated CAMEL-9182:
----------------------------------------
Comment: was deleted
(was: Hi [~davsclaus] -
Correct me if I am wrong. My interpretation of the below code snippet in
KafkaConsumer is,
* ConsumerCount - create multiple connections & consume messages as if there
are multiple endpoints configured.
* consumerSteams - Determines the number of KafkaStreams retrieved by each
KafkaConnection
Number of ConsumerTasks getting created matches consumerCount.
And I expected Executor Threadpool to be initialized based on number of
consumerCounts.
{code}
@Override
protected void doStart() throws Exception {
super.doStart();
log.info("Starting Kafka consumer");
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConsumersCount(); i++) {
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(new
ConsumerConfig(getProps()));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(endpoint.getTopic(),
endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams =
consumerMap.get(endpoint.getTopic());
// commit periodically
if (endpoint.isAutoCommitEnable() != null &&
!endpoint.isAutoCommitEnable()) {
if ((endpoint.getConsumerTimeoutMs() == null ||
endpoint.getConsumerTimeoutMs() < 0)
&& endpoint.getConsumerStreams() > 1) {
LOG.warn("consumerTimeoutMs is set to -1 (infinite) while
requested multiple consumer streams.");
}
CyclicBarrier barrier = new
CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new BatchingConsumerTask(stream, barrier));
}
consumerBarriers.put(consumer, barrier);
} else {
// auto commit
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new AutoCommitConsumerTask(consumer,
stream));
}
consumerBarriers.put(consumer, null);
}
}
}
{code})
> camel-kafka : Kafka Endpoint executor threadpool is not initialized based on
> consumercount
> ------------------------------------------------------------------------------------------
>
> Key: CAMEL-9182
> URL: https://issues.apache.org/jira/browse/CAMEL-9182
> Project: Camel
> Issue Type: Bug
> Affects Versions: 2.15.0
> Reporter: yogananth mahalingam
> Assignee: Claus Ibsen
>
> Kafka Endpoint's Executor threadpool is expected to be created based on
> consumer count.
> This would have enabled multiple consumers to be running concurrently.
> Instead it is initialized based on ConsumerStreams.
> With a configuration of consumer count = 10, consumer stream = 1, the
> messages are getting consumed sequentially.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)