Add some logging in kafka producer when it sends/has all sent etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6243402b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6243402b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6243402b Branch: refs/heads/master Commit: 6243402b2291af99c7f7b950f5e905489d403074 Parents: 8b5e93e Author: Claus Ibsen <[email protected]> Authored: Mon Apr 24 13:29:33 2017 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Apr 24 13:29:33 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaProducer.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6243402b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index ede3d3e..e3b556b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -232,7 +232,11 @@ public class KafkaProducer extends DefaultAsyncProducer { } while (c.hasNext()) { - futures.add(kafkaProducer.send(c.next())); + ProducerRecord rec = c.next(); + if (log.isDebugEnabled()) { + log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key()); + } + futures.add(kafkaProducer.send(rec)); } for (Future<RecordMetadata> f : futures) { //wait for them all to be sent @@ -248,7 +252,11 @@ public class KafkaProducer extends DefaultAsyncProducer { KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback); while (c.hasNext()) { cb.increment(); - kafkaProducer.send(c.next(), cb); + ProducerRecord rec = c.next(); + if (log.isDebugEnabled()) { + log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key()); + } + kafkaProducer.send(rec, cb); } return cb.allSent(); } catch (Exception ex) { @@ -306,6 +314,7 @@ public class KafkaProducer extends DefaultAsyncProducer { boolean allSent() { if (count.decrementAndGet() == 0) { + log.trace("All messages sent, continue routing."); //was able to get all the work done while queuing the requests callback.done(true); return true; @@ -327,6 +336,7 @@ public class KafkaProducer extends DefaultAsyncProducer { workerPool.submit(new Runnable() { @Override public void run() { + log.trace("All messages sent, continue routing."); callback.done(false); } });
