This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit bf35490edf14efa38c413dbe84b0c780a25ac0e5
Author: Jason Huynh <[email protected]>
AuthorDate: Wed Jan 22 21:46:46 2020 -0800

    added logger to classes
    Removed system outs
---
 src/main/java/geode/kafka/GeodeConnectorConfig.java          |  4 ++++
 .../java/geode/kafka/source/GeodeKafkaSourceListener.java    |  7 +++++--
 src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java   | 12 +++++++-----
 src/test/java/geode/kafka/WorkerAndHerderCluster.java        |  1 -
 .../java/geode/kafka/source/GeodeKafkaSourceTaskTest.java    |  2 +-
 5 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java 
b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 9ac561f..4f75ec0 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -110,6 +110,10 @@ public class GeodeConnectorConfig {
     }
 
 
+    public int getTaskId() {
+        return taskId;
+    }
+
     public String getDurableClientId() {
         return durableClientId;
     }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java 
b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index c4d6b22..019bb5a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -2,12 +2,16 @@ package geode.kafka.source;
 
 import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqStatusListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 class GeodeKafkaSourceListener implements CqStatusListener {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
+
     public String regionName;
     private BlockingQueue<GeodeEvent> eventBuffer;
 
@@ -19,7 +23,6 @@ class GeodeKafkaSourceListener implements CqStatusListener {
     @Override
     public void onEvent(CqEvent aCqEvent) {
         try {
-            System.out.println("JASON cqEvent and putting into eventBuffer");
             eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, 
TimeUnit.SECONDS);
         } catch (InterruptedException e) {
 
@@ -30,7 +33,7 @@ class GeodeKafkaSourceListener implements CqStatusListener {
                 } catch (InterruptedException ex) {
                     ex.printStackTrace();
                 }
-                System.out.println("GeodeKafkaSource Queue is full");
+                logger.info("GeodeKafkaSource Queue is full");
             }
         }
     }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 23fa141..3f2ac80 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -6,6 +6,8 @@ import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -22,16 +24,16 @@ import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
 
 public class GeodeKafkaSourceTask extends SourceTask {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
+
     private static final String TASK_PREFIX = "TASK";
     private static final String DOT = ".";
 
     //property string to pass in to identify task
     private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
-    private int taskId;
     private GeodeContext geodeContext;
     private List<String> topics;
-
     private Map<String, Map<String, String>> sourcePartitions;
     private static BlockingQueue<GeodeEvent> eventBuffer;
     private int batchSize;
@@ -52,6 +54,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
     public void start(Map<String, String> props) {
         try {
             GeodeConnectorConfig geodeConnectorConfig = new 
GeodeConnectorConfig(props);
+            logger.debug("GeodeKafkaSourceTask id:" + 
geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext(geodeConnectorConfig);
 
             batchSize = Integer.parseInt(props.get(BATCH_SIZE));
@@ -66,8 +69,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, 
cqPrefix);
         }
         catch (Exception e) {
-            System.out.println("Exception:" + e);
-            e.printStackTrace();
+            logger.error("Unable to start task", e);
             throw e;
         }
     }
@@ -82,7 +84,6 @@ public class GeodeKafkaSourceTask extends SourceTask {
                     records.add(new 
SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, 
topic, null, event.getEvent()));
                 }
             }
-
             return records;
         }
 
@@ -96,6 +97,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
 
     void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, 
GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
       boolean isDurable = geodeConnectorConfig.isDurable();
+      int taskId = geodeConnectorConfig.getTaskId();
         for (String region : geodeConnectorConfig.getRegionNames()) {
             installListenersToRegion(geodeContext, taskId, eventBuffer, 
region, cqPrefix, isDurable);
         }
diff --git a/src/test/java/geode/kafka/WorkerAndHerderCluster.java 
b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
index c347946..37a53f8 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -11,7 +11,6 @@ public class WorkerAndHerderCluster {
     }
 
     public void start() throws IOException, InterruptedException {
-        System.out.println("JASON starting worker");
         workerAndHerder.exec();
 
     }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java 
b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 33c260d..de78345 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -74,7 +74,7 @@ public class GeodeKafkaSourceTaskTest {
         props.put(GeodeConnectorConfig.BATCH_SIZE, 
GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
 
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.start(props);
+//        task.start(props);
 
 //        assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
 

Reply via email to