This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7b4ee00 Fixed bug where tasks were sharing the same props Modified
source and sink tasks to share all region and topic bindings Each source task
is able to update kafka but only one will register for a specific region
binding Each sink task is able to update the appropriate Geode region
7b4ee00 is described below
commit 7b4ee0037025e784022900b86c1a111e369900e6
Author: Jason Huynh <[email protected]>
AuthorDate: Tue Feb 4 13:35:30 2020 -0800
Fixed bug where tasks were sharing the same props
Modified source and sink tasks to share all region and topic bindings
Each source task is able to update kafka but only one will register for a
specific region binding
Each sink task is able to update the appropriate Geode region
---
README.md | 5 +-
.../java/geode/kafka/GeodeConnectorConfig.java | 7 +-
src/main/java/geode/kafka/sink/BatchRecords.java | 13 +-
src/main/java/geode/kafka/sink/GeodeKafkaSink.java | 8 +-
.../java/geode/kafka/sink/GeodeKafkaSinkTask.java | 35 ++-
.../geode/kafka/sink/GeodeSinkConnectorConfig.java | 6 -
.../java/geode/kafka/source/GeodeKafkaSource.java | 10 +-
.../geode/kafka/source/GeodeKafkaSourceTask.java | 20 +-
.../kafka/source/GeodeSourceConnectorConfig.java | 20 +-
.../java/geode/kafka/GeodeConnectorConfigTest.java | 6 +-
...afkaSinkTaskTest.java => GeodeContextTest.java} | 4 +-
.../java/geode/kafka/GeodeKafkaTestCluster.java | 260 ++++++++++++++++-----
.../java/geode/kafka/WorkerAndHerderCluster.java | 4 +-
.../java/geode/kafka/WorkerAndHerderWrapper.java | 6 +-
.../geode/kafka/sink/GeodeKafkaSinkTaskTest.java | 57 +++++
.../java/geode/kafka/sink/GeodeKafkaSinkTest.java | 37 ++-
.../kafka/source/GeodeKafkaSourceTaskTest.java | 12 +-
.../geode/kafka/source/GeodeKafkaSourceTest.java | 45 ++++
18 files changed, 440 insertions(+), 115 deletions(-)
diff --git a/README.md b/README.md
index 06074a0..c80cdb4 100644
--- a/README.md
+++ b/README.md
@@ -57,7 +57,6 @@ bin/connect-standalone.sh
config/connect-standalone.properties config/connect-ge
|regionToTopics| yes | A comma separated list of "one region to many topics"
mappings. Each mapping is surrounded by brackets. For example
"[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None.
This is required to be set in the source connector properties|
|security-client-auth-init| no | Point to class that implements the
[AuthInitialize
Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
|geodeConnectorBatchSize| no | Maximum number of records to return on each
poll| 100 |
-|geodeConnectorQueueSize| no | Maximum number of entries in the connector
queue before backing up all Geode cq listeners sharing the task queue | 10000 |
| loadEntireRegion| no| Determines if we should queue up all entries that
currently exist in the region. This allows us to copy existing region data.
Will be replayed whenever a task needs to re-register a cq| true |
|durableClientIdPrefix| no | Prefix string for tasks to append to when
registering as a durable client. If empty string, will not register as a
durable client | "" |
| durableClientTimeout| no | How long in milliseconds to persist values in
Geode's durable queue before the queue is invalidated| 60000 |
@@ -69,6 +68,10 @@ bin/connect-standalone.sh
config/connect-standalone.properties config/connect-ge
* Consider modifying Kafka Properties like tasks.max in the source and sink
parameters.
+Extra Details
+* Each source task has information and will push off the shared queue to
Kafka, however only one task will register a cq with Apache Geode
+* Each sink task is able to update any of the configured Apache Geode region.
+
### Possible Upcoming Featured:
* Formatters - Possibly a JSON to and from PDX formatter
* Security - security settings for Geode
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java
b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index ac9a31f..efffdcd 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -16,6 +16,7 @@ package geode.kafka;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -62,8 +63,8 @@ public class GeodeConnectorConfig {
* @return mapping of regionName to list of topics to update
*/
public static Map<String, List<String>> parseRegionToTopics(String
combinedBindings) {
- if (combinedBindings == "" || combinedBindings == null){
- return null;
+ if (combinedBindings == null || combinedBindings.equals("")){
+ return new HashMap();
}
List<String> bindings = parseBindings(combinedBindings);
return bindings.stream().map(binding -> {
@@ -95,7 +96,7 @@ public class GeodeConnectorConfig {
}
public static String reconstructString(Collection<String> strings) {
- return strings.stream().collect(Collectors.joining("],[")) + "]";
+ return strings.stream().collect(Collectors.joining("],["));
}
List<LocatorHostPort> parseLocators(String locators) {
diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java
b/src/main/java/geode/kafka/sink/BatchRecords.java
index 6baddfd..c10faaf 100644
--- a/src/main/java/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/geode/kafka/sink/BatchRecords.java
@@ -14,8 +14,11 @@
*/
package geode.kafka.sink;
+import geode.kafka.source.GeodeKafkaSourceTask;
import org.apache.geode.cache.Region;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,6 +30,7 @@ import java.util.Map;
* A collection of records to put/remove from a region
*/
public class BatchRecords {
+ private static final Logger logger =
LoggerFactory.getLogger(BatchRecords.class);
private Map updateMap;
private Collection removeList;
@@ -67,7 +71,12 @@ public class BatchRecords {
public void executeOperations(Region region) {
- region.putAll(updateMap);
- region.removeAll(removeList);
+ if (region != null) {
+ region.putAll(updateMap);
+ region.removeAll(removeList);
+ }
+ else {
+ logger.info("Unable to locate proxy region: " + region);
+ }
}
}
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index bf93f62..1ccb385 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -48,14 +48,12 @@ public class GeodeKafkaSink extends SinkConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
- Map<String, String> taskProps = new HashMap<>();
- taskProps.putAll(sharedProps);
- List<String> bindings =
GeodeConnectorConfig.parseStringByComma(taskProps.get(TOPIC_TO_REGION_BINDINGS));
- List<List<String>> bindingsPerTask =
ConnectorUtils.groupPartitions(bindings, maxTasks);
+ //All tasks will build up the topic to regions map. A few might not
use certain keys but we have no control over partitioning in kafka and which
tasks will fire
for (int i = 0; i < maxTasks; i++) {
+ Map<String, String> taskProps = new HashMap<>();
+ taskProps.putAll(sharedProps);
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskProps.put(TOPIC_TO_REGION_BINDINGS,
GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index 66e528f..bcc5885 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -17,6 +17,7 @@ package geode.kafka.sink;
import geode.kafka.GeodeContext;
import geode.kafka.GeodeSinkConnectorConfig;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -39,6 +40,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
private static final Logger logger =
LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
private GeodeContext geodeContext;
+ private int taskId;
private Map<String, List<String>> topicToRegions;
private Map<String, Region> regionNameToRegion;
private boolean nullValuesMeansRemove = true;
@@ -56,24 +58,37 @@ public class GeodeKafkaSinkTask extends SinkTask {
public void start(Map<String, String> props) {
try {
GeodeSinkConnectorConfig geodeConnectorConfig = new
GeodeSinkConnectorConfig(props);
- logger.debug("GeodeKafkaSourceTask id:" +
geodeConnectorConfig.getTaskId() + " starting");
+ configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit());
- topicToRegions = geodeConnectorConfig.getTopicToRegions();
regionNameToRegion = createProxyRegions(topicToRegions.values());
- nullValuesMeansRemove =
geodeConnectorConfig.getNullValuesMeanRemove();
} catch (Exception e) {
logger.error("Unable to start sink task", e);
throw e;
}
}
+ void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
+ logger.debug("GeodeKafkaSourceTask id:" +
geodeConnectorConfig.getTaskId() + " starting");
+ taskId = geodeConnectorConfig.getTaskId();
+ topicToRegions = geodeConnectorConfig.getTopicToRegions();
+ nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
+ }
+
+ //For tests only
+ void setRegionNameToRegion(Map<String, Region> regionNameToRegion) {
+ this.regionNameToRegion = regionNameToRegion;
+ }
+
@Override
public void put(Collection<SinkRecord> records) {
+ put(records, new HashMap());
+ }
+
+ void put(Collection<SinkRecord> records, Map<String, BatchRecords>
batchRecordsMap) {
//spin off a new thread to handle this operation? Downside is
ordering and retries...
- Map<String, BatchRecords> batchRecordsMap = new HashMap<>();
for (SinkRecord record : records) {
- updateRegionsByTopic(record, batchRecordsMap);
+ updateBatchForRegionByTopic(record, batchRecordsMap);
}
batchRecordsMap.entrySet().stream().forEach((entry) -> {
String region = entry.getKey();
@@ -82,7 +97,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
});
}
- private void updateRegionsByTopic(SinkRecord sinkRecord, Map<String,
BatchRecords> batchRecordsMap) {
+ private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
Map<String, BatchRecords> batchRecordsMap) {
Collection<String> regionsToUpdate =
topicToRegions.get(sinkRecord.topic());
for (String region : regionsToUpdate) {
updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
@@ -113,7 +128,13 @@ public class GeodeKafkaSinkTask extends SinkTask {
}
private Region createProxyRegion(String regionName) {
- return
geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+ try {
+ return
geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+ }
+ catch (RegionExistsException e) {
+ //Each task is a seperate parallel task controlled by kafka.
+ return geodeContext.getClientCache().getRegion(regionName);
+ }
}
@Override
diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
index cd7b363..59a57cd 100644
--- a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -26,12 +26,6 @@ public class GeodeSinkConnectorConfig extends
GeodeConnectorConfig {
private Map<String, List<String>> topicToRegions;
private final boolean nullValuesMeanRemove;
- //just for tests
- GeodeSinkConnectorConfig() {
- super();
- nullValuesMeanRemove =
Boolean.parseBoolean(DEFAULT_NULL_VALUES_MEAN_REMOVE);
- }
-
public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
super(connectorProperties);
topicToRegions =
parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 26821e1..b81401a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -29,6 +29,7 @@ import java.util.Map;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.CQS_TO_REGISTER;
import static geode.kafka.source.GeodeSourceConnectorConfig.CQ_PREFIX;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
@@ -58,15 +59,14 @@ public class GeodeKafkaSource extends SourceConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
- Map<String, String> taskProps = new HashMap<>();
- taskProps.putAll(sharedProps);
-
- List<String> bindings =
GeodeConnectorConfig.parseStringByComma(taskProps.get(REGION_TO_TOPIC_BINDINGS));
+ List<String> bindings =
GeodeConnectorConfig.parseStringByComma(sharedProps.get(REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask =
ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
+ Map<String, String> taskProps = new HashMap<>();
+ taskProps.putAll(sharedProps);
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskProps.put(REGION_TO_TOPIC_BINDINGS,
GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
+ taskProps.put(CQS_TO_REGISTER,
GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
return taskConfigs;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 8d827a1..c6cf6cb 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
-import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
public class GeodeKafkaSourceTask extends SourceTask {
@@ -48,9 +48,11 @@ public class GeodeKafkaSourceTask extends SourceTask {
private GeodeContext geodeContext;
private GeodeSourceConnectorConfig geodeConnectorConfig;
+ private int taskId;
private Map<String, List<String>> regionToTopics;
+ private Collection<String> cqsToRegister;
private Map<String, Map<String, String>> sourcePartitions;
- private BlockingQueue<GeodeEvent> eventBuffer;
+ private static BlockingQueue<GeodeEvent> eventBuffer = new
LinkedBlockingQueue<>(100000);
private int batchSize;
@@ -69,15 +71,15 @@ public class GeodeKafkaSourceTask extends SourceTask {
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
+ taskId = geodeConnectorConfig.getTaskId();
logger.debug("GeodeKafkaSourceTask id:" +
geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit());
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
- int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
- eventBuffer = new LinkedBlockingQueue<>(queueSize);
regionToTopics = geodeConnectorConfig.getRegionToTopics();
+ cqsToRegister = geodeConnectorConfig.getCqsToRegister();
sourcePartitions =
createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = geodeConnectorConfig.getCqPrefix();
@@ -85,10 +87,10 @@ public class GeodeKafkaSourceTask extends SourceTask {
boolean loadEntireRegion =
geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer,
cqPrefix, loadEntireRegion);
} catch (Exception e) {
+ e.printStackTrace();
logger.error("Unable to start source task", e);
throw e;
}
-
}
@Override
@@ -100,7 +102,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);
for (String topic : topics) {
- records.add(new
SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null,
event.getEvent().getNewValue()));
+ records.add(new
SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null,
event.getEvent().getKey(), null, event.getEvent().getNewValue()));
}
}
return records;
@@ -117,7 +119,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig,
GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean
loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
- for (String region :
geodeConnectorConfig.getRegionToTopics().keySet()) {
+ for (String region : geodeConnectorConfig.getCqsToRegister()) {
installListenersToRegion(geodeContext, taskId, eventBuffer,
region, cqPrefix, loadEntireRegion, isDurable);
}
if (isDurable) {
@@ -154,9 +156,9 @@ public class GeodeKafkaSourceTask extends SourceTask {
Map<String, Map<String, String>>
createSourcePartitionsMap(Collection<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put(REGION_NAME, regionName);
+ sourcePartition.put(REGION_PARTITION, regionName);
return sourcePartition;
- }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
+ }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
}
String generateCqName(int taskId, String cqPrefix, String regionName) {
diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
index 1ccdf75..0b46d2b 100644
--- a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -16,6 +16,7 @@ package geode.kafka.source;
import geode.kafka.GeodeConnectorConfig;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -33,8 +34,9 @@ public class GeodeSourceConnectorConfig extends
GeodeConnectorConfig {
/**
* Used as a key for source partitions
*/
- public static final String REGION = "region";
+ public static final String REGION_PARTITION = "regionPartition";
public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
+ public static final String CQS_TO_REGISTER = "cqsToRegister";
public static final String BATCH_SIZE = "geodeConnectorBatchSize";
public static final String DEFAULT_BATCH_SIZE = "100";
@@ -52,19 +54,11 @@ public class GeodeSourceConnectorConfig extends
GeodeConnectorConfig {
private final boolean loadEntireRegion;
private Map<String, List<String>> regionToTopics;
-
- //just for tests
- protected GeodeSourceConnectorConfig() {
- super();
- durableClientId = "";
- durableClientIdPrefix = "";
- durableClientTimeout = "0";
- cqPrefix = DEFAULT_CQ_PREFIX;
- loadEntireRegion = Boolean.parseBoolean(DEFAULT_LOAD_ENTIRE_REGION);
- }
+ private Collection<String> cqsToRegister;
public GeodeSourceConnectorConfig(Map<String, String> connectorProperties)
{
super(connectorProperties);
+ cqsToRegister =
parseRegionToTopics(connectorProperties.get(CQS_TO_REGISTER)).keySet();
regionToTopics =
parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
durableClientIdPrefix =
connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
@@ -113,4 +107,8 @@ public class GeodeSourceConnectorConfig extends
GeodeConnectorConfig {
return regionToTopics;
}
+ public Collection<String> getCqsToRegister() {
+ return cqsToRegister;
+ }
+
}
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 91230d3..84a3ec5 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -99,7 +100,6 @@ public class GeodeConnectorConfigTest {
,"[region1: topic1 ,topic3], [region2 :topic2]"});
}
-
@Test
@Parameters(method="oneToManyBindings")
public void reconstructBindingsToStringShouldReformAParsableString(String
value) {
@@ -107,6 +107,10 @@ public class GeodeConnectorConfigTest {
String reconstructString =
GeodeConnectorConfig.reconstructString(splitBindings);
splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
assertEquals(Arrays.toString(splitBindings.toArray()), 2,
splitBindings.size());
+ for(String topicOrRegion: splitBindings) {
+ assertFalse(topicOrRegion.contains("\\["));
+ assertFalse(topicOrRegion.contains("\\]"));
+ }
}
@Test
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
b/src/test/java/geode/kafka/GeodeContextTest.java
similarity index 92%
copy from src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
copy to src/test/java/geode/kafka/GeodeContextTest.java
index 3f670d2..1f92bbb 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/geode/kafka/GeodeContextTest.java
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package geode.kafka.sink;
+package geode.kafka;
-public class GeodeKafkaSinkTaskTest {
+public class GeodeContextTest {
}
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index de1121c..121a978 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -15,7 +15,6 @@
package geode.kafka;
import kafka.admin.RackAwareMode;
-import geode.kafka.source.GeodeKafkaSource;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.geode.cache.Region;
@@ -49,6 +48,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -80,51 +80,44 @@ public class GeodeKafkaTestCluster {
startZooKeeper();
startKafka();
startGeode();
- createTopic();
- startWorker();
- consumer = createConsumer();
}
- @Before
- public void beforeTests() {
- }
-
- @After
- public void afterTests() {
-
- }
@AfterClass
public static void shutdown() {
workerAndHerderCluster.stop();
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
- AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
- adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
-
+// AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+// adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
+// adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
+ zkClient.close();
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
}
- private static void startWorker() throws IOException, InterruptedException {
+ private static void startWorker(int maxTasks) throws IOException,
InterruptedException {
workerAndHerderCluster = new WorkerAndHerderCluster();
- workerAndHerderCluster.start();
+ workerAndHerderCluster.start(String.valueOf(maxTasks));
Thread.sleep(20000);
}
- private static void createTopic() {
+ private static void createTopic(String topicName, int numPartitions, int
replicationFactor) {
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic(TEST_TOPIC_FOR_SOURCE,1
- ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
- adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1
- ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
+ adminZkClient.createTopic(topicName, numPartitions,replicationFactor,
topicProperties, RackAwareMode.Disabled$.MODULE$);
+ }
+
+ private static void deleteTopic(String topicName) {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
+ 15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+ adminZkClient.deleteTopic(topicName);
}
private ClientCache createGeodeClient() {
@@ -165,14 +158,6 @@ public class GeodeKafkaTestCluster {
props.put("host.name", "localHost");
props.put("port", BROKER_PORT);
props.put("offsets.topic.replication.factor", "1");
- props.put("log.flush.interval.messages", "1");
- props.put("log.flush.interval.ms", "10");
-
-
- //Connector configs
- props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
GeodeKafkaSource.class.getName());
- props.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
- props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
//Specifically GeodeKafka connector configs
return props;
@@ -189,6 +174,7 @@ public class GeodeKafkaTestCluster {
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create the consumer using props.
final Consumer<String, String> consumer =
@@ -214,34 +200,204 @@ public class GeodeKafkaTestCluster {
}
@Test
- public void endToEndSourceTest() {
- ClientCache client = createGeodeClient();
- Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
-
- //right now just verify something makes it end to end
- AtomicInteger valueReceived = new AtomicInteger(0);
- await().atMost(10, TimeUnit.SECONDS).until(() -> {
- region.put("KEY", "VALUE" + System.currentTimeMillis());
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(4));
- for (ConsumerRecord<String, String> record: records) {
- valueReceived.incrementAndGet();
+ public void endToEndSourceTest() throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SOURCE, 1, 1);
+ startWorker(1);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+
+ for (int i = 0; i < 10 ; i++) {
+ region.put("KEY" + i, "VALUE" + i);
+ }
+
+ AtomicInteger valueReceived = new AtomicInteger(0);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
+ for (ConsumerRecord<String, String> record : records) {
+ valueReceived.incrementAndGet();
+ }
+ return valueReceived.get() == 10;
+ });
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SOURCE);
+ }
+ }
+
+
+ @Test
+ public void endToEndSourceSingleRegionMultiTaskMultiPartitionTest() throws
Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
+ startWorker(1);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+
+ for (int i = 0; i < 10 ; i++) {
+ region.put("KEY" + i, "VALUE" + i);
}
- return valueReceived.get() == 10;
- });
+
+ AtomicInteger valueReceived = new AtomicInteger(0);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
+ for (ConsumerRecord<String, String> record : records) {
+ valueReceived.incrementAndGet();
+ }
+ return valueReceived.get() == 10;
+ });
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SOURCE);
+ }
}
@Test
- public void endToEndSinkTest() {
- ClientCache client = createGeodeClient();
- Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+ public void
endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest()
throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
+ startWorker(5);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+
+ for (int i = 0; i < 10 ; i++) {
+ region.put("KEY" + i, "VALUE" + i);
+ }
- Producer<String, String> producer = createProducer();
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE"
+ i));
+ AtomicInteger valueReceived = new AtomicInteger(0);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(2));
+ for (ConsumerRecord<String, String> record : records) {
+ valueReceived.incrementAndGet();
+ }
+ return valueReceived.get() == 10;
+ });
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SOURCE);
}
+ }
- int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10,
region.sizeOnServer()));
+ @Test
+ public void endToEndSinkTest() throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SINK, 1, 1);
+ startWorker(1);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+
+ Producer<String, String> producer = createProducer();
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
+ }
+
+ int i = 0;
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(10, region.sizeOnServer()));
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SINK);
+ }
+ }
+
+
+ @Test
+ public void
endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest()
throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
+ startWorker(5);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+
+ Producer<String, String> producer = createProducer();
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
+ }
+
+ int i = 0;
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(10, region.sizeOnServer()));
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SINK);
+ }
+ }
+
+ @Test
+ public void
endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest()
throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
+ startWorker(15);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+
+ Producer<String, String> producer = createProducer();
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
+ }
+
+ int i = 0;
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(10, region.sizeOnServer()));
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SINK);
+ }
+ }
+
+ @Test
+ public void endToEndWithOneTaskForASingleBindingLessTasksThanPartitions()
throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
+ startWorker(5);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+
+ Producer<String, String> producer = createProducer();
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i,
"VALUE" + i));
+ }
+
+ int i = 0;
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(10, region.sizeOnServer()));
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SINK);
+ }
+ }
+
+ @Test
+ public void endToEndWithOneTaskForASingleBindingMoreTasksThanPartitions()
throws Exception {
+ try {
+ createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
+ startWorker(5);
+ consumer = createConsumer();
+
+ ClientCache client = createGeodeClient();
+ Region region =
client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+
+ Producer<String, String> producer = createProducer();
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i,
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ }
+
+ int i = 0;
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(10, region.sizeOnServer()));
+ }
+ finally {
+ deleteTopic(TEST_TOPIC_FOR_SINK);
+ }
}
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
index 59d0b02..c388a5a 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -24,8 +24,8 @@ public class WorkerAndHerderCluster {
workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
}
- public void start() throws IOException, InterruptedException {
- workerAndHerder.exec();
+ public void start(String maxTasks) throws IOException,
InterruptedException {
+ workerAndHerder.exec(maxTasks);
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 385209e..24427aa 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -41,6 +41,8 @@ import static
geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BIND
public class WorkerAndHerderWrapper {
public static void main(String[] args) throws IOException {
+ String maxTasks = args[0];
+
Map props = new HashMap();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("offset.storage.file.filename", "/tmp/connect.offsets");
@@ -68,7 +70,7 @@ public class WorkerAndHerderWrapper {
Map<String, String> sourceProps = new HashMap<>();
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG,
"geode-kafka-source-connector");
- sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+ sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
sourceProps.put(REGION_TO_TOPIC_BINDINGS,
TEST_REGION_TO_TOPIC_BINDINGS);
herder.putConnectorConfig(
@@ -79,7 +81,7 @@ public class WorkerAndHerderWrapper {
Map<String, String> sinkProps = new HashMap<>();
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG,
"geode-kafka-sink-connector");
- sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+ sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index 3f670d2..ed8f040 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -14,5 +14,62 @@
*/
package geode.kafka.sink;
+import geode.kafka.GeodeSinkConnectorConfig;
+import org.apache.geode.cache.Region;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
public class GeodeKafkaSinkTaskTest {
+
+ private HashMap<String, String> createTestSinkProps(boolean
nullMeansRemove) {
+ HashMap<String, String> props = new HashMap<>();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
+ props.put(TASK_ID, "0");
+ props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove));
+ props.put(LOCATORS, "localhost[10334]");
+ return props;
+ }
+
+ @Test
+ public void putRecordsAddsToRegionBatchRecords() {
+ boolean nullMeansRemove = true;
+ GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
+ HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+
+ SinkRecord topicRecord = mock(SinkRecord.class);
+ when(topicRecord.topic()).thenReturn("topic");
+ when(topicRecord.value()).thenReturn("value");
+ when(topicRecord.key()).thenReturn("key");
+
+ List<SinkRecord> records = new ArrayList();
+ records.add(topicRecord);
+
+ HashMap<String, Region> regionNameToRegion = new HashMap<>();
+ GeodeSinkConnectorConfig geodeSinkConnectorConfig = new
GeodeSinkConnectorConfig(props);
+ HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
+ BatchRecords batchRecords = mock(BatchRecords.class);
+ batchRecordsMap.put("region", batchRecords);
+ task.configure(geodeSinkConnectorConfig);
+ task.setRegionNameToRegion(regionNameToRegion);
+
+ task.put(records, batchRecordsMap);
+ assertTrue(batchRecordsMap.containsKey("region"));
+ verify(batchRecords, times(1)).addUpdateOperation(topicRecord,
nullMeansRemove);
+ }
}
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
index 7205741..633c7ae 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -16,15 +16,50 @@ package geode.kafka.sink;
import org.junit.Test;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class GeodeKafkaSinkTest {
@Test
- public void test() {
+ public void taskConfigsCreatesMaxNumberOfTasks() {
+ GeodeKafkaSink sink = new GeodeKafkaSink();
+ Map<String, String> props = new HashMap();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+ sink.start(props);
+ Collection<Map<String,String>> tasks = sink.taskConfigs(5);
+ assertEquals(5, tasks.size());
+ }
+
+ @Test
+ public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() {
+ GeodeKafkaSink sink = new GeodeKafkaSink();
+ Map<String, String> props = new HashMap();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+ sink.start(props);
+ Collection<Map<String,String>> tasks = sink.taskConfigs(5);
+ for(Map<String, String> prop : tasks) {
+ assertEquals("[someTopic:someRegion]",
prop.get(TOPIC_TO_REGION_BINDINGS));
+ }
+ }
+
+ @Test
+ public void eachTaskHasUniqueTaskIds() {
GeodeKafkaSink sink = new GeodeKafkaSink();
Map<String, String> props = new HashMap();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
sink.start(props);
+ Collection<Map<String,String>> tasks = sink.taskConfigs(5);
+ HashSet<String> seenIds = new HashSet();
+ for(Map<String, String> taskProp : tasks) {
+ assertTrue(seenIds.add(taskProp.get(TASK_ID)));
+ }
}
}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index d206252..ffcc3d8 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
-import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -119,7 +119,7 @@ public class GeodeKafkaSourceTaskTest {
regionToTopicsMap.put("region1", new ArrayList());
GeodeSourceConnectorConfig config =
mock(GeodeSourceConnectorConfig.class);
- when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
+ when
(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
@@ -137,7 +137,7 @@ public class GeodeKafkaSourceTaskTest {
regionToTopicsMap.put("region1", new ArrayList());
GeodeSourceConnectorConfig config =
mock(GeodeSourceConnectorConfig.class);
- when (config.getRegionToTopics()).thenReturn(regionToTopicsMap);
+ when
(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(),
"someCqPrefix", true);
@@ -191,9 +191,9 @@ public class GeodeKafkaSourceTaskTest {
List<String> regionNames = Arrays.asList(new String[]{"region1",
"region2", "region3"});
Map<String, Map<String,String>> sourcePartitions =
task.createSourcePartitionsMap(regionNames);
assertThat(3, is(sourcePartitions.size()));
- assertThat(true,
is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1")));
- assertThat(true,
is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2")));
- assertThat(true,
is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
+ assertThat(true,
is(sourcePartitions.get("region1").get(REGION_PARTITION).equals("region1")));
+ assertThat(true,
is(sourcePartitions.get("region2").get(REGION_PARTITION).equals("region2")));
+ assertThat(true,
is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3")));
}
@Test
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index badf319..3e61c77 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -14,8 +14,53 @@
*/
package geode.kafka.source;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static
geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class GeodeKafkaSourceTest {
+ @Test
+ public void taskConfigsCreatesMaxNumberOfTasks() {
+ GeodeKafkaSource source = new GeodeKafkaSource();
+ Map<String, String> props = new HashMap();
+ props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+ source.start(props);
+ Collection<Map<String,String>> tasks = source.taskConfigs(5);
+ assertEquals(5, tasks.size());
+ }
+
+ @Test
+ public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() {
+ GeodeKafkaSource source = new GeodeKafkaSource();
+ Map<String, String> props = new HashMap();
+ props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+ source.start(props);
+ Collection<Map<String,String>> tasks = source.taskConfigs(5);
+ for(Map<String, String> prop : tasks) {
+ assertEquals("[someRegion:someTopic]",
prop.get(REGION_TO_TOPIC_BINDINGS));
+ }
+ }
+ @Test
+ public void eachTaskHasUniqueTaskIds() {
+ GeodeKafkaSource sink = new GeodeKafkaSource();
+ Map<String, String> props = new HashMap();
+ props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+ sink.start(props);
+ Collection<Map<String,String>> tasks = sink.taskConfigs(5);
+ HashSet<String> seenIds = new HashSet();
+ for(Map<String, String> taskProp : tasks) {
+ assertTrue(seenIds.add(taskProp.get(TASK_ID)));
+ }
+ }
}