This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3c3804b Debug logging added
3c3804b is described below
commit 3c3804b85d18a0c485a798bf237997469bf1fa1c
Author: Naburun Nag <[email protected]>
AuthorDate: Tue Mar 3 17:46:36 2020 -0800
Debug logging added
---
src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java | 5 +++++
.../java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java | 1 +
2 files changed, 6 insertions(+)
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 700688e..1f50ea4 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -85,12 +85,17 @@ public class GeodeKafkaSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
+ logger.debug("Received " + records.size() + " records.");
put(records, new HashMap<>());
}
void put(Collection<SinkRecord> records, Map<String, BatchRecords>
batchRecordsMap) {
// spin off a new thread to handle this operation? Downside is ordering
and retries...
for (SinkRecord record : records) {
+ logger.debug("kafka coordinates:(Topic:"
+ + record.topic() +
+ " Partition:" + record.kafkaPartition() + " Offset:" +
record.kafkaOffset()
+ + ")");
updateBatchForRegionByTopic(record, batchRecordsMap);
}
batchRecordsMap.forEach(
diff --git
a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index d53a9e9..13e5b60 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -104,6 +104,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
+ logger.debug("Geode events polled :" + events.size());
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);