This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit 63fd78f0dd0dc683b2235247511e3377b0809150
Author: Jason Huynh <[email protected]>
AuthorDate: Fri Jan 31 16:42:01 2020 -0800

    Removed system outs
    Passing through the security auth init property
---
 src/main/java/geode/kafka/GeodeConnectorConfig.java      |  7 +++++++
 src/main/java/geode/kafka/GeodeContext.java              | 16 +++++++++++-----
 src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java   |  3 +--
 .../java/geode/kafka/source/GeodeKafkaSourceTask.java    |  6 +-----
 4 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java 
b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 88f5a02..ac9a31f 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -32,9 +32,11 @@ public class GeodeConnectorConfig {
      */
     public static final String LOCATORS = "locators";
     public static final String DEFAULT_LOCATOR = "localhost[10334]";
+    public static final String SECURITY_CLIENT_AUTH_INIT = 
"security-client-auth-init";
 
     protected final int taskId;
     protected List<LocatorHostPort> locatorHostPorts;
+    private String securityClientAuthInit;
 
     protected GeodeConnectorConfig() {
         taskId = 0;
@@ -43,6 +45,7 @@ public class GeodeConnectorConfig {
     public GeodeConnectorConfig(Map<String, String> connectorProperties) {
         taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
         locatorHostPorts = 
parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+        securityClientAuthInit = 
connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
     }
 
 
@@ -116,4 +119,8 @@ public class GeodeConnectorConfig {
     public List<LocatorHostPort> getLocatorHostPorts() {
         return locatorHostPorts;
     }
+
+    public String getSecurityClientAuthInit() {
+        return securityClientAuthInit;
+    }
 }
diff --git a/src/main/java/geode/kafka/GeodeContext.java 
b/src/main/java/geode/kafka/GeodeContext.java
index 2078782..ff8a8c3 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -26,6 +26,8 @@ import org.apache.kafka.connect.errors.ConnectException;
 import java.util.Collection;
 import java.util.List;
 
+import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+
 public class GeodeContext {
 
     private ClientCache clientCache;
@@ -34,13 +36,13 @@ public class GeodeContext {
     public GeodeContext() {
     }
 
-    public ClientCache connectClient(List<LocatorHostPort> 
locatorHostPortList, String durableClientId, String durableClientTimeout) {
-        clientCache = createClientCache(locatorHostPortList, durableClientId, 
durableClientTimeout);
+    public ClientCache connectClient(List<LocatorHostPort> 
locatorHostPortList, String durableClientId, String durableClientTimeout, 
String securityAuthInit) {
+        clientCache = createClientCache(locatorHostPortList, durableClientId, 
durableClientTimeout, securityAuthInit);
         return clientCache;
     }
 
-    public ClientCache connectClient(List<LocatorHostPort> 
locatorHostPortList) {
-        clientCache = createClientCache(locatorHostPortList, "", "");
+    public ClientCache connectClient(List<LocatorHostPort> 
locatorHostPortList, String securityAuthInit) {
+        clientCache = createClientCache(locatorHostPortList, "", "", 
securityAuthInit);
         return clientCache;
     }
 
@@ -55,8 +57,12 @@ public class GeodeContext {
      * @param durableClientTimeOut
      * @return
      */
-    public ClientCache createClientCache(List<LocatorHostPort> locators, 
String durableClientName, String durableClientTimeOut) {
+    public ClientCache createClientCache(List<LocatorHostPort> locators, 
String durableClientName, String durableClientTimeOut, String securityAuthInit) 
{
         ClientCacheFactory ccf = new ClientCacheFactory();
+
+        if (securityAuthInit != null) {
+            ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
+        }
         if (!durableClientName.equals("")) {
             ccf.set("durable-client-id", durableClientName)
                     .set("durable-client-timeout", durableClientTimeOut);
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java 
b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index a1d7ecf..66e528f 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -58,12 +58,11 @@ public class GeodeKafkaSinkTask extends SinkTask {
             GeodeSinkConnectorConfig geodeConnectorConfig = new 
GeodeSinkConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + 
geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext();
-            
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts());
+            
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), 
geodeConnectorConfig.getSecurityClientAuthInit());
             topicToRegions = geodeConnectorConfig.getTopicToRegions();
             regionNameToRegion = createProxyRegions(topicToRegions.values());
             nullValuesMeansRemove = 
geodeConnectorConfig.getNullValuesMeanRemove();
         } catch (Exception e) {
-            e.printStackTrace();
             logger.error("Unable to start sink task", e);
             throw e;
         }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 0f6b50c..8d827a1 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -68,11 +68,10 @@ public class GeodeKafkaSourceTask extends SourceTask {
     @Override
     public void start(Map<String, String> props) {
         try {
-            System.out.println("JASON start task");
             geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
             logger.debug("GeodeKafkaSourceTask id:" + 
geodeConnectorConfig.getTaskId() + " starting");
             geodeContext = new GeodeContext();
-            
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), 
geodeConnectorConfig.getDurableClientId(), 
geodeConnectorConfig.getDurableClientTimeout());
+            
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), 
geodeConnectorConfig.getDurableClientId(), 
geodeConnectorConfig.getDurableClientTimeout(), 
geodeConnectorConfig.getSecurityClientAuthInit());
 
             batchSize = Integer.parseInt(props.get(BATCH_SIZE));
             int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
@@ -86,12 +85,9 @@ public class GeodeKafkaSourceTask extends SourceTask {
             boolean loadEntireRegion = 
geodeConnectorConfig.getLoadEntireRegion();
             installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, 
cqPrefix, loadEntireRegion);
         } catch (Exception e) {
-            System.out.println("JASON start task failed" + e);
-            e.printStackTrace();
             logger.error("Unable to start source task", e);
             throw e;
         }
-        System.out.println("JASON end task");
 
     }
 

Reply via email to