This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b3e4f05  Modified initial results to consume the correct type Modified 
security
b3e4f05 is described below

commit b3e4f059f25ccb18d69d30a34dc04b91abd5c14c
Author: Jason Huynh <[email protected]>
AuthorDate: Tue Feb 11 13:13:17 2020 -0800

    Modified initial results to consume the correct type
    Modified security
---
 .../java/org/geode/kafka/GeodeConnectorConfig.java | 18 ++++++++++++-----
 src/main/java/org/geode/kafka/GeodeContext.java    | 23 ++++++++++++++--------
 .../kafka/security/SystemPropertyAuthInit.java     |  4 ++--
 .../org/geode/kafka/sink/GeodeKafkaSinkTask.java   |  2 +-
 .../java/org/geode/kafka/source/GeodeEvent.java    | 19 ++++++++++++++----
 .../geode/kafka/source/GeodeKafkaSourceTask.java   | 11 ++++++-----
 .../kafka/source/GeodeKafkaSourceTaskTest.java     | 19 +++++++++++++-----
 7 files changed, 66 insertions(+), 30 deletions(-)

diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java 
b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index c09f1cf..2860a8f 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -38,13 +38,13 @@ public class GeodeConnectorConfig extends AbstractConfig {
   public static final String DEFAULT_LOCATOR = "localhost[10334]";
   public static final String SECURITY_CLIENT_AUTH_INIT = 
"security-client-auth-init";
   private static final String DEFAULT_SECURITY_AUTH_INIT = 
"org.geode.kafka.security.SystemPropertyAuthInit";
-  public static final String SECURITY_USER = "securityUsername";
-  public static final String SECURITY_PASSWORD= "securityPassword";
+  public static final String SECURITY_USER = "security-username";
+  public static final String SECURITY_PASSWORD= "security-password";
 
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
   private String securityClientAuthInit;
-  private String securityUser;
+  private String securityUserName;
   private String securityPassword;
 
   //Just for testing
@@ -64,7 +64,7 @@ public class GeodeConnectorConfig extends AbstractConfig {
     super(configDef, connectorProperties);
     taskId = getInt(TASK_ID);
     locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
-    securityUser = getString(SECURITY_USER);
+    securityUserName = getString(SECURITY_USER);
     securityPassword = getString(SECURITY_PASSWORD);
     securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
     //if we registered a username/password instead of auth init, we should use 
the default auth init if one isn't specified
@@ -160,7 +160,15 @@ public class GeodeConnectorConfig extends AbstractConfig {
     return securityClientAuthInit;
   }
 
+  public String getSecurityUserName() {
+    return securityUserName;
+  }
+
+  public String getSecurityPassword() {
+    return securityPassword;
+  }
+
   public boolean usesSecurity() {
-    return securityClientAuthInit != null || securityUser != null;
+    return securityClientAuthInit != null || securityUserName != null;
   }
 }
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java 
b/src/main/java/org/geode/kafka/GeodeContext.java
index 2858e4d..6190ef2 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -17,6 +17,7 @@ package org.geode.kafka;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.geode.cache.query.CqResults;
 import org.apache.kafka.connect.errors.ConnectException;
 
 import org.apache.geode.cache.client.ClientCache;
@@ -28,6 +29,8 @@ import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.RegionNotFoundException;
 
 import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
 
 public class GeodeContext {
 
@@ -37,15 +40,15 @@ public class GeodeContext {
   public GeodeContext() {}
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String durableClientId, String durableClientTimeout, String 
securityAuthInit, boolean usesSecurity) {
+      String durableClientId, String durableClientTimeout, String 
securityAuthInit, String securityUserName, String securityPassword, boolean 
usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, durableClientId, 
durableClientTimeout,
-        securityAuthInit, usesSecurity);
+        securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String securityAuthInit, boolean usesSecurity) {
-    clientCache = createClientCache(locatorHostPortList, "", "", 
securityAuthInit, usesSecurity);
+      String securityAuthInit, String securityUserName, String 
securityPassword, boolean usesSecurity) {
+    clientCache = createClientCache(locatorHostPortList, "", "", 
securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
@@ -54,10 +57,14 @@ public class GeodeContext {
   }
 
   public ClientCache createClientCache(List<LocatorHostPort> locators, String 
durableClientName,
-      String durableClientTimeOut, String securityAuthInit, boolean 
usesSecurity) {
+      String durableClientTimeOut, String securityAuthInit, String 
securityUserName, String securityPassword, boolean usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
-    if (usesSecurity ) {
+    if (usesSecurity) {
+      if (securityUserName != null && securityPassword != null) {
+        ccf.set(SECURITY_USER, securityUserName);
+        ccf.set(SECURITY_PASSWORD, securityPassword);
+      }
       ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
     }
     if (!durableClientName.equals("")) {
@@ -85,8 +92,8 @@ public class GeodeContext {
     }
   }
 
-  public Collection newCqWithInitialResults(String name, String query, 
CqAttributes cqAttributes,
-      boolean isDurable) throws ConnectException {
+  public CqResults newCqWithInitialResults(String name, String query, 
CqAttributes cqAttributes,
+                                                   boolean isDurable) throws 
ConnectException {
     try {
       CqQuery cq = clientCache.getQueryService().newCq(name, query, 
cqAttributes, isDurable);
       return cq.executeWithInitialResults();
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java 
b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index db48699..cc525a2 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -27,8 +27,8 @@ public class SystemPropertyAuthInit implements AuthInitialize 
{
   public Properties getCredentials(Properties securityProps, DistributedMember 
server,
       boolean isPeer) throws AuthenticationFailedException {
     Properties extractedProperties = new Properties();
-    extractedProperties.put("security-username", 
System.getProperty(GeodeConnectorConfig.SECURITY_USER));
-    extractedProperties.put("security-password", 
System.getProperty(GeodeConnectorConfig.SECURITY_PASSWORD));
+    extractedProperties.put("security-username", 
securityProps.get("security-username"));
+    extractedProperties.put("security-password", 
securityProps.get("security-password"));
     return extractedProperties;
   }
 }
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java 
b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index 4552e09..7db384f 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,7 +61,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), 
geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(), 
geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(), 
geodeConnectorConfig.usesSecurity());
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/source/GeodeEvent.java 
b/src/main/java/org/geode/kafka/source/GeodeEvent.java
index 2333955..5b51d07 100644
--- a/src/main/java/org/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/org/geode/kafka/source/GeodeEvent.java
@@ -22,18 +22,29 @@ import org.apache.geode.cache.query.CqEvent;
 public class GeodeEvent {
 
   private String regionName;
-  private CqEvent event;
+  private Object key;
+  private Object value;
 
   public GeodeEvent(String regionName, CqEvent event) {
+    this(regionName, event.getKey(), event.getNewValue());
+  }
+
+  public GeodeEvent(String regionName, Object key, Object value) {
     this.regionName = regionName;
-    this.event = event;
+    this.key = key;
+    this.value = value;
   }
 
   public String getRegionName() {
     return regionName;
   }
 
-  public CqEvent getEvent() {
-    return event;
+  public Object getKey() {
+    return key;
   }
+
+  public Object getValue() {
+    return value;
+  }
+
 }
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index b829335..b1c289f 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
 import org.geode.kafka.GeodeContext;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -68,7 +70,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
           geodeConnectorConfig.getDurableClientId(), 
geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), 
geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(), 
geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(), 
geodeConnectorConfig.usesSecurity());
 
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new 
SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -98,7 +100,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
         List<String> topics = regionToTopics.get(regionName);
         for (String topic : topics) {
           records.add(new SourceRecord(sourcePartitions.get(regionName), 
OFFSET_DEFAULT, topic,
-              null, event.getEvent().getKey(), null, 
event.getEvent().getNewValue()));
+              null, event.getKey(), null, event.getValue()));
         }
       }
       return records;
@@ -134,12 +136,11 @@ public class GeodeKafkaSourceTask extends SourceTask {
     CqAttributes cqAttributes = cqAttributesFactory.create();
     try {
       if (loadEntireRegion) {
-        Collection<CqEvent> events =
+        CqResults events =
             geodeContext.newCqWithInitialResults(generateCqName(taskId, 
cqPrefix, regionName),
                 "select * from /" + regionName, cqAttributes,
                 isDurable);
-        eventBuffer.get().addAll(
-            events.stream().map(e -> new GeodeEvent(regionName, 
e)).collect(Collectors.toList()));
+        eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e 
-> new GeodeEvent(regionName, ((Struct)e).get("key"), 
((Struct)e).get("value"))).collect(Collectors.toList()));
       } else {
         geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
             "select * from /" + regionName, cqAttributes,
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java 
b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 29a901a..7901426 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -36,6 +36,15 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.LinkedStructSet;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.StructImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.geode.kafka.GeodeContext;
 import org.junit.Test;
 
@@ -52,9 +61,9 @@ public class GeodeKafkaSourceTaskTest {
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
     boolean loadEntireRegion = true;
     boolean isDurable = false;
-    List<CqEvent> fakeInitialResults = new LinkedList<>();
+    CqResults fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
-      fakeInitialResults.add(mock(CqEvent.class));
+      fakeInitialResults.add(mock(Struct.class));
     }
 
     when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), 
anyBoolean()))
@@ -71,7 +80,7 @@ public class GeodeKafkaSourceTaskTest {
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
     boolean loadEntireRegion = false;
     boolean isDurable = false;
-    List<CqEvent> fakeInitialResults = new LinkedList<>();
+    CqResults fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
       fakeInitialResults.add(mock(CqEvent.class));
     }
@@ -92,7 +101,7 @@ public class GeodeKafkaSourceTaskTest {
     boolean isDurable = false;
 
     when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), 
anyBoolean()))
-        .thenReturn(new ArrayList());
+        .thenReturn(mock(CqResults.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     GeodeKafkaSourceListener listener =
         task.installListenersToRegion(geodeContext, 1, 
createEventBufferSupplier(eventBuffer),
@@ -140,7 +149,7 @@ public class GeodeKafkaSourceTaskTest {
 
     GeodeContext geodeContext = mock(GeodeContext.class);
     when(geodeContext.getClientCache()).thenReturn(clientCache);
-
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), 
any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
     Map<String, List<String>> regionToTopicsMap = new HashMap<>();
     regionToTopicsMap.put("region1", new ArrayList());
 

Reply via email to