This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 9e4bc00d57b5d801432015ec26d4bfafed670552 Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 22 21:36:32 2020 -0800 Added GeodeContext (for reals this time) Added GeodeConnectorConfigTest --- src/main/java/geode/kafka/GeodeContext.java | 47 ++++++++++++++++++++++ .../java/geode/kafka/GeodeConnectorConfigTest.java | 44 ++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java new file mode 100644 index 0000000..4582b93 --- /dev/null +++ b/src/main/java/geode/kafka/GeodeContext.java @@ -0,0 +1,47 @@ +package geode.kafka; + +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.query.CqAttributes; +import org.apache.geode.cache.query.CqException; +import org.apache.geode.cache.query.CqExistsException; +import org.apache.geode.cache.query.CqQuery; +import org.apache.geode.cache.query.RegionNotFoundException; +import org.apache.kafka.connect.errors.ConnectException; + +import java.util.List; + +public class GeodeContext { + + private ClientCache clientCache; + + + public GeodeContext(GeodeConnectorConfig connectorConfig) { + clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout()); + } + + public ClientCache getClientCache() { + return clientCache; + } + + public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) { + ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName) + .set("durable-client-timeout", durableClientTimeOut) + .setPoolSubscriptionEnabled(true); + for (LocatorHostPort locator: locators) { + ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create(); + } + return ccf.create(); + } + + public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException { + try { + CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable); + cq.execute(); + return cq; + } catch (RegionNotFoundException | CqException | CqExistsException e) { + e.printStackTrace(); + throw new ConnectException(e); + } + } +} diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java new file mode 100644 index 0000000..54f9e52 --- /dev/null +++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java @@ -0,0 +1,44 @@ +package geode.kafka; + +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class GeodeConnectorConfigTest { + + @Test + public void parseRegionNamesShouldSplitOnComma() { + GeodeConnectorConfig config = new GeodeConnectorConfig(); + List<String> regionNames = config.parseNames("region1,region2,region3,region4"); + assertEquals(4, regionNames.size()); + assertThat(true, allOf(is(regionNames.contains("region1")) + , is(regionNames.contains("region2")) + , is(regionNames.contains("region3")) + , is(regionNames.contains("region4")))); + } + + @Test + public void parseRegionNamesShouldChomp() { + GeodeConnectorConfig config = new GeodeConnectorConfig(); + List<String> regionNames = config.parseNames("region1, region2, region3,region4"); + assertEquals(4, regionNames.size()); + assertThat(true, allOf(is(regionNames instanceof List) + , is(regionNames.contains("region1")) + , is(regionNames.contains("region2")) + , is(regionNames.contains("region3")) + , is(regionNames.contains("region4")))); + } + + @Test + public void shouldBeAbleToParseGeodeLocatorStrings() { + GeodeConnectorConfig config = new GeodeConnectorConfig(); + String locatorString="localhost[8888], localhost[8881]"; + List<LocatorHostPort> locators = config.parseLocators(locatorString); + assertThat(2, is(locators.size())); + } +}
