This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7d7a77d Added SystemPropertyAuthInit Test Removed unused test method
corrected spelling mistake
7d7a77d is described below
commit 7d7a77d715bf26365c8e3776302d498898b78391
Author: Jason Huynh <[email protected]>
AuthorDate: Fri Feb 14 10:27:58 2020 -0800
Added SystemPropertyAuthInit Test
Removed unused test method
corrected spelling mistake
---
.../java/org/geode/kafka/GeodeConnectorConfig.java | 7 ------
.../org/geode/kafka/sink/GeodeKafkaSinkTask.java | 2 +-
.../kafka/security/SystemPropertyAuthInitTest.java | 27 ++++++++++++++++++++++
.../geode/kafka/sink/GeodeKafkaSinkTaskTest.java | 25 ++++++++++++++++++++
.../org/geode/kafka/sink/GeodeKafkaSinkTest.java | 5 ++++
5 files changed, 58 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index cc151a4..7fbfb55 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -54,13 +54,6 @@ public class GeodeConnectorConfig extends AbstractConfig {
taskId = 0;
}
- // Just for testing
- protected GeodeConnectorConfig(Map<String, String> props) {
- super(new ConfigDef(), props);
- taskId = 0;
- }
-
-
public GeodeConnectorConfig(ConfigDef configDef, Map<String, String>
connectorProperties) {
super(configDef, connectorProperties);
taskId = getInt(TASK_ID);
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index be44356..7c77c7f 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -138,7 +138,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
return
geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(regionName);
} catch (RegionExistsException e) {
- // Each task is a seperate parallel task controlled by kafka.
+ // Each task is a separate parallel task controlled by kafka.
return geodeContext.getClientCache().getRegion(regionName);
}
}
diff --git
a/src/test/java/org/geode/kafka/security/SystemPropertyAuthInitTest.java
b/src/test/java/org/geode/kafka/security/SystemPropertyAuthInitTest.java
new file mode 100644
index 0000000..cdfd48c
--- /dev/null
+++ b/src/test/java/org/geode/kafka/security/SystemPropertyAuthInitTest.java
@@ -0,0 +1,27 @@
+package org.geode.kafka.security;
+
+import org.apache.geode.security.AuthInitialize;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static org.junit.Assert.assertEquals;
+
+public class SystemPropertyAuthInitTest {
+
+ @Test
+ public void userNameAndPasswordAreObtainedFromSecurityProps() {
+ SystemPropertyAuthInit auth = new SystemPropertyAuthInit();
+ String userName = "someUsername";
+ String password = "somePassword";
+
+ Properties securityProps = new Properties();
+ securityProps.put(SECURITY_USER, userName);
+ securityProps.put(SECURITY_PASSWORD, password);
+ Properties credentials = auth.getCredentials(securityProps, null,
true);
+ assertEquals(credentials.get((AuthInitialize.SECURITY_USERNAME)),
userName);
+ assertEquals(credentials.get((AuthInitialize.SECURITY_PASSWORD)),
password);
+ }
+}
diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
b/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index f7feb6f..32925fb 100644
--- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -16,6 +16,7 @@ package org.geode.kafka.sink;
import static
org.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
import static
org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -69,4 +70,28 @@ public class GeodeKafkaSinkTaskTest {
assertTrue(batchRecordsMap.containsKey("region"));
verify(batchRecords, times(1)).addUpdateOperation(topicRecord,
nullMeansRemove);
}
+
+ @Test
+ public void newBatchRecordsAreCreatedIfOneDoesntExist() {
+ boolean nullMeansRemove = true;
+ GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
+ HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+
+ SinkRecord topicRecord = mock(SinkRecord.class);
+ when(topicRecord.topic()).thenReturn("topic");
+ when(topicRecord.value()).thenReturn("value");
+ when(topicRecord.key()).thenReturn("key");
+
+ List<SinkRecord> records = new ArrayList();
+ records.add(topicRecord);
+
+ HashMap<String, Region> regionNameToRegion = new HashMap<>();
+ GeodeSinkConnectorConfig geodeSinkConnectorConfig = new
GeodeSinkConnectorConfig(props);
+ HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
+ task.configure(geodeSinkConnectorConfig);
+ task.setRegionNameToRegion(regionNameToRegion);
+
+ task.put(records, batchRecordsMap);
+ assertNotNull(batchRecordsMap.get("region"));
+ }
}
diff --git a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
b/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
index c82fec4..28e7033 100644
--- a/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/org/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -29,6 +29,11 @@ import org.junit.Test;
public class GeodeKafkaSinkTest {
@Test
+ public void taskClassReferencesCorrectSinkTaskClass() {
+ assertEquals(GeodeKafkaSinkTask.class, new GeodeKafkaSink().taskClass());
+ }
+
+ @Test
public void taskConfigsCreatesMaxNumberOfTasks() {
GeodeKafkaSink sink = new GeodeKafkaSink();
Map<String, String> props = new HashMap();