This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c3804b  Debug logging added
3c3804b is described below

commit 3c3804b85d18a0c485a798bf237997469bf1fa1c
Author: Naburun Nag <[email protected]>
AuthorDate: Tue Mar 3 17:46:36 2020 -0800

    Debug logging added
---
 src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java    | 5 +++++
 .../java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java     | 1 +
 2 files changed, 6 insertions(+)

diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java 
b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 700688e..1f50ea4 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -85,12 +85,17 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   @Override
   public void put(Collection<SinkRecord> records) {
+    logger.debug("Received " + records.size() + " records.");
     put(records, new HashMap<>());
   }
 
   void put(Collection<SinkRecord> records, Map<String, BatchRecords> 
batchRecordsMap) {
     // spin off a new thread to handle this operation? Downside is ordering 
and retries...
     for (SinkRecord record : records) {
+      logger.debug("kafka coordinates:(Topic:"
+          + record.topic() +
+          " Partition:" + record.kafkaPartition() + " Offset:" + 
record.kafkaOffset()
+          + ")");
       updateBatchForRegionByTopic(record, batchRecordsMap);
     }
     batchRecordsMap.forEach(
diff --git 
a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index d53a9e9..13e5b60 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -104,6 +104,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
     ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
     ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
     if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
+      logger.debug("Geode events polled :" + events.size());
       for (GeodeEvent event : events) {
         String regionName = event.getRegionName();
         List<String> topics = regionToTopics.get(regionName);

Reply via email to