This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 6d2ec84ccc0ad22a1e46be54fb7a4952bcfaa924 Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 22 21:35:00 2020 -0800 Added GeodeContext to handle cache related operations Added GeodeConnectorConfig for shared configs Moved packages --- .../java/geode/kafka/GeodeConnectorConfig.java | 132 ++++++++++++++ .../java/{ => geode}/kafka/LocatorHostPort.java | 2 +- .../kafka/sink}/GeodeKafkaSink.java | 2 +- .../{kafka => geode/kafka/source}/GeodeEvent.java | 2 +- .../kafka/source}/GeodeKafkaSource.java | 36 ++-- .../kafka/source}/GeodeKafkaSourceListener.java | 3 +- .../geode/kafka/source/GeodeKafkaSourceTask.java | 133 ++++++++++++++ src/main/java/kafka/GeodeConnectorConfig.java | 41 ----- src/main/java/kafka/GeodeKafkaSourceTask.java | 196 --------------------- .../{ => geode}/kafka/GeodeKafkaTestCluster.java | 3 +- .../java/{ => geode}/kafka/GeodeLocalCluster.java | 2 +- src/test/java/{ => geode}/kafka/JavaProcess.java | 2 +- .../java/{ => geode}/kafka/KafkaLocalCluster.java | 2 +- .../{ => geode}/kafka/LocatorLauncherWrapper.java | 2 +- .../{ => geode}/kafka/ServerLauncherWrapper.java | 2 +- .../{ => geode}/kafka/WorkerAndHerderCluster.java | 2 +- .../{ => geode}/kafka/WorkerAndHerderWrapper.java | 15 +- .../{ => geode}/kafka/ZooKeeperLocalCluster.java | 2 +- .../kafka/source}/GeodeKafkaSourceTaskTest.java | 37 +--- .../kafka/source}/GeodeKafkaSourceTest.java | 2 +- 20 files changed, 306 insertions(+), 312 deletions(-) diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java new file mode 100644 index 0000000..9ac561f --- /dev/null +++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java @@ -0,0 +1,132 @@ +package geode.kafka; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class GeodeConnectorConfig { + + //Geode Configuration + public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId"; + public static final String DEFAULT_DURABLE_CLIENT_ID = ""; + public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; + public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; + + + //GeodeKafka Specific Configuration + public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task + + public static final String CQ_PREFIX = "cqPrefix"; + public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; + /** + * Specifies which Locators to connect to Apache Geode + */ + public static final String LOCATORS = "locators"; + public static final String DEFAULT_LOCATOR = "localhost[10334]"; + + /** + * Specifies which Regions to connect in Apache Geode + */ + public static final String REGIONS = "regions"; + + /** + * Specifies which Topics to connect in Kafka + */ + public static final String TOPICS = "topics"; + + /** + * Property to describe the Source Partition in a record + */ + public static final String REGION_NAME = "regionName"; //used for Source Partition Events + + public static final String BATCH_SIZE = "geodeConnectorBatchSize"; + public static final String DEFAULT_BATCH_SIZE = "100"; + + public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; + public static final String DEFAULT_QUEUE_SIZE = "100000"; + + + + private final int taskId; + private final String durableClientId; + private final String durableClientIdPrefix; + private final String durableClientTimeout; + private List<String> regionNames; + private List<String> topics; + private List<LocatorHostPort> locatorHostPorts; + + //just for tests + GeodeConnectorConfig() { + taskId = 0; + durableClientId = ""; + durableClientIdPrefix = ""; + durableClientTimeout = "0"; + } + + public GeodeConnectorConfig(Map<String, String> connectorProperties) { + taskId = Integer.parseInt(connectorProperties.get(TASK_ID)); + durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX); + if (isDurable(durableClientIdPrefix)) { + durableClientId = durableClientIdPrefix + taskId; + } + else { + durableClientId = ""; + } + durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT); + regionNames = parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS)); + topics = parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS)); + locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS)); + } + + List<String> parseNames(String names) { + return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList()); + } + + List<LocatorHostPort> parseLocators(String locators) { + return Arrays.stream(locators.split(",")).map((s) -> { + String locatorString = s.trim(); + return parseLocator(locatorString); + }).collect(Collectors.toList()); + } + + private LocatorHostPort parseLocator(String locatorString) { + String[] splits = locatorString.split("\\["); + String locator = splits[0]; + int port = Integer.parseInt(splits[1].replace("]", "")); + return new LocatorHostPort(locator, port); + } + + public boolean isDurable() { + return isDurable(durableClientId); + } + + /** + * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value + * @return + */ + boolean isDurable(String durableClientId) { + return !durableClientId.equals(""); + } + + + public String getDurableClientId() { + return durableClientId; + } + + public String getDurableClientTimeout() { + return durableClientTimeout; + } + + public List<String> getRegionNames() { + return regionNames; + } + + public List<String> getTopics() { + return topics; + } + + public List<LocatorHostPort> getLocatorHostPorts() { + return locatorHostPorts; + } +} diff --git a/src/main/java/kafka/LocatorHostPort.java b/src/main/java/geode/kafka/LocatorHostPort.java similarity index 95% rename from src/main/java/kafka/LocatorHostPort.java rename to src/main/java/geode/kafka/LocatorHostPort.java index 517bad7..50d7440 100644 --- a/src/main/java/kafka/LocatorHostPort.java +++ b/src/main/java/geode/kafka/LocatorHostPort.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; public class LocatorHostPort { diff --git a/src/main/java/kafka/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java similarity index 98% rename from src/main/java/kafka/GeodeKafkaSink.java rename to src/main/java/geode/kafka/sink/GeodeKafkaSink.java index 67a244e..68460e4 100644 --- a/src/main/java/kafka/GeodeKafkaSink.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka.sink; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; diff --git a/src/main/java/kafka/GeodeEvent.java b/src/main/java/geode/kafka/source/GeodeEvent.java similarity index 94% rename from src/main/java/kafka/GeodeEvent.java rename to src/main/java/geode/kafka/source/GeodeEvent.java index 805f4a0..41e37c6 100644 --- a/src/main/java/kafka/GeodeEvent.java +++ b/src/main/java/geode/kafka/source/GeodeEvent.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka.source; import org.apache.geode.cache.query.CqEvent; diff --git a/src/main/java/kafka/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java similarity index 67% rename from src/main/java/kafka/GeodeKafkaSource.java rename to src/main/java/geode/kafka/source/GeodeKafkaSource.java index 95afc50..d5da62a 100644 --- a/src/main/java/kafka/GeodeKafkaSource.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java @@ -1,5 +1,6 @@ -package kafka; +package geode.kafka.source; +import geode.kafka.GeodeConnectorConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; @@ -10,21 +11,19 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static kafka.GeodeConnectorConfig.BATCH_SIZE; -import static kafka.GeodeConnectorConfig.CQ_PREFIX; -import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; -import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; -import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; -import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; -import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; -import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; -import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; -import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; -import static kafka.GeodeConnectorConfig.LOCATORS; -import static kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static kafka.GeodeConnectorConfig.REGIONS; -import static kafka.GeodeConnectorConfig.TOPICS; -import static kafka.GeodeKafkaSourceTask.TASK_ID; +import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; +import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static geode.kafka.GeodeConnectorConfig.LOCATORS; +import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; + public class GeodeKafkaSource extends SourceConnector { @@ -39,16 +38,14 @@ public class GeodeKafkaSource extends SourceConnector { @Override public List<Map<String, String>> taskConfigs(int maxTasks) { - System.out.println("GKSource: taskConfigs"); List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(sharedProps); - // use the same props for all tasks at the moment for (int i = 0; i < maxTasks; i++) { //TODO partition regions and topics - taskProps.put(TASK_ID, "" + i); + taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i); taskConfigs.add(taskProps); } return taskConfigs; @@ -82,6 +79,7 @@ public class GeodeKafkaSource extends SourceConnector { @Override public String version() { + //TODO return AppInfoParser.getVersion(); } diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java similarity index 95% rename from src/main/java/kafka/GeodeKafkaSourceListener.java rename to src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java index 4c0e729..c4d6b22 100644 --- a/src/main/java/kafka/GeodeKafkaSourceListener.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java @@ -1,7 +1,6 @@ -package kafka; +package geode.kafka.source; import org.apache.geode.cache.query.CqEvent; -import org.apache.geode.cache.query.CqListener; import org.apache.geode.cache.query.CqStatusListener; import java.util.concurrent.BlockingQueue; diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java new file mode 100644 index 0000000..23fa141 --- /dev/null +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -0,0 +1,133 @@ +package geode.kafka.source; + +import geode.kafka.GeodeContext; +import geode.kafka.GeodeConnectorConfig; +import org.apache.geode.cache.query.CqAttributes; +import org.apache.geode.cache.query.CqAttributesFactory; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; +import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; +import static geode.kafka.GeodeConnectorConfig.REGION_NAME; + +public class GeodeKafkaSourceTask extends SourceTask { + + private static final String TASK_PREFIX = "TASK"; + private static final String DOT = "."; + + //property string to pass in to identify task + private static final Map<String, Long> OFFSET_DEFAULT = createOffset(); + + private int taskId; + private GeodeContext geodeContext; + private List<String> topics; + + private Map<String, Map<String, String>> sourcePartitions; + private static BlockingQueue<GeodeEvent> eventBuffer; + private int batchSize; + + + private static Map<String, Long> createOffset() { + Map<String, Long> offset = new HashMap<>(); + offset.put("OFFSET", 0L); + return offset; + } + + @Override + public String version() { + return null; + } + + @Override + public void start(Map<String, String> props) { + try { + GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props); + geodeContext = new GeodeContext(geodeConnectorConfig); + + batchSize = Integer.parseInt(props.get(BATCH_SIZE)); + int queueSize = Integer.parseInt(props.get(QUEUE_SIZE)); + eventBuffer = new LinkedBlockingQueue<>(queueSize); + + sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames()); + topics = geodeConnectorConfig.getTopics(); + + String cqPrefix = props.get(CQ_PREFIX); + + installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix); + } + catch (Exception e) { + System.out.println("Exception:" + e); + e.printStackTrace(); + throw e; + } + } + + @Override + public List<SourceRecord> poll() throws InterruptedException { + ArrayList<SourceRecord> records = new ArrayList<>(batchSize); + ArrayList<GeodeEvent> events = new ArrayList<>(batchSize); + if (eventBuffer.drainTo(events, batchSize) > 0) { + for (GeodeEvent event : events) { + for (String topic : topics) { + records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent())); + } + } + + return records; + } + + return null; + } + + @Override + public void stop() { + geodeContext.getClientCache().close(true); + } + + void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) { + boolean isDurable = geodeConnectorConfig.isDurable(); + for (String region : geodeConnectorConfig.getRegionNames()) { + installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable); + } + if (isDurable) { + geodeContext.getClientCache().readyForEvents(); + } + } + + void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) { + CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); + cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName)); + CqAttributes cqAttributes = cqAttributesFactory.create(); + geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, + isDurable); + } + + /** + * converts a list of regions names into a map of source partitions + * + * @param regionNames list of regionNames + * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name + */ + Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) { + return regionNames.stream().map(regionName -> { + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put(REGION_NAME, regionName); + return sourcePartition; + }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s)); + } + + String generateCqName(int taskId, String cqPrefix, String regionName) { + return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName; + } + +} diff --git a/src/main/java/kafka/GeodeConnectorConfig.java b/src/main/java/kafka/GeodeConnectorConfig.java deleted file mode 100644 index 003367d..0000000 --- a/src/main/java/kafka/GeodeConnectorConfig.java +++ /dev/null @@ -1,41 +0,0 @@ -package kafka; - -public class GeodeConnectorConfig { - - //Geode Configuration - public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId"; - public static final String DEFAULT_DURABLE_CLIENT_ID = ""; - public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; - public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; - - - //GeodeKafka Specific Configuration - public static final String CQ_PREFIX = "cqPrefix"; - public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; - /** - * Specifies which Locators to connect to Apache Geode - */ - public static final String LOCATORS = "locators"; - public static final String DEFAULT_LOCATOR = "localhost[10334]"; - - /** - * Specifies which Regions to connect in Apache Geode - */ - public static final String REGIONS = "regions"; - - /** - * Specifies which Topics to connect in Kafka - */ - public static final String TOPICS = "topics"; - - /** - * Property to describe the Source Partition in a record - */ - public static final String REGION_NAME = "regionName"; //used for Source Partition Events - - public static final String BATCH_SIZE = "geodeConnectorBatchSize"; - public static final String DEFAULT_BATCH_SIZE = "100"; - - public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; - public static final String DEFAULT_QUEUE_SIZE = "100000"; -} diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java deleted file mode 100644 index 896b602..0000000 --- a/src/main/java/kafka/GeodeKafkaSourceTask.java +++ /dev/null @@ -1,196 +0,0 @@ -package kafka; - -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.query.CqAttributes; -import org.apache.geode.cache.query.CqAttributesFactory; -import org.apache.geode.cache.query.CqException; -import org.apache.geode.cache.query.CqExistsException; -import org.apache.geode.cache.query.RegionNotFoundException; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.source.SourceTask; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; - -import static kafka.GeodeConnectorConfig.BATCH_SIZE; -import static kafka.GeodeConnectorConfig.CQ_PREFIX; -import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; -import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; -import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; -import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; -import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; -import static kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static kafka.GeodeConnectorConfig.REGION_NAME; - -public class GeodeKafkaSourceTask extends SourceTask { - - //property string to pass in to identify task - public static final String TASK_ID = "GEODE_TASK_ID"; - private static final String TASK_PREFIX = "TASK"; - private static final String DOT = "."; - private static final Map<String, Long> OFFSET_DEFAULT = createOffset(); - - private int taskId; - private ClientCache clientCache; - private List<String> regionNames; - private List<String> topics; - private Map<String, Map<String, String>> sourcePartitions; - private static BlockingQueue<GeodeEvent> eventBuffer; - private int batchSize; - - - private static Map<String, Long> createOffset() { - Map<String, Long> offset = new HashMap<>(); - offset.put("OFFSET", 0L); - return offset; - } - - @Override - public String version() { - return null; - } - - @Override - public void start(Map<String, String> props) { - try { - System.out.println("JASON task start"); - taskId = Integer.parseInt(props.get(TASK_ID)); - batchSize = Integer.parseInt(props.get(BATCH_SIZE)); - int queueSize = Integer.parseInt(props.get(QUEUE_SIZE)); - eventBuffer = new LinkedBlockingQueue<>(queueSize); - - //grouping will be done in the source and not the task - regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS)); - topics = parseNames(props.get(GeodeConnectorConfig.TOPICS)); - sourcePartitions = createSourcePartitionsMap(regionNames); - - String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX); - if (!durableClientId.equals("")) { - durableClientId += taskId; - } - System.out.println("JASON durable client id is:" + durableClientId); - String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT); - String cqPrefix = props.get(CQ_PREFIX); - - List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS)); - installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix); - System.out.println("JASON task start finished"); - } - catch (Exception e) { - System.out.println("Exception:" + e); - e.printStackTrace(); - throw e; - } - } - - @Override - public List<SourceRecord> poll() throws InterruptedException { - ArrayList<SourceRecord> records = new ArrayList<>(batchSize); - ArrayList<GeodeEvent> events = new ArrayList<>(batchSize); - if (eventBuffer.drainTo(events, batchSize) > 0) { - for (GeodeEvent event : events) { - for (String topic : topics) { - records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent())); -// records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING")); - } - } - - return records; - } - - return null; - } - - @Override - public void stop() { - clientCache.close(true); - } - - ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) { - ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName) - .set("durable-client-timeout", durableClientTimeOut) - .setPoolSubscriptionEnabled(true); - for (LocatorHostPort locator: locators) { - ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create(); - } - return ccf.create(); - } - - void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) { - boolean isDurable = isDurable(durableClientId); - - clientCache = createClientCache(locators, durableClientId, durableClientTimeout); - for (String region : regionNames) { - installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable); - } - if (isDurable) { - clientCache.readyForEvents(); - } - } - - void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) { - CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); - cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName)); - CqAttributes cqAttributes = cqAttributesFactory.create(); - try { - clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, - isDurable).execute(); - } catch (CqExistsException e) { - System.out.println("UHH"); - e.printStackTrace(); - } catch (CqException | RegionNotFoundException e) { - System.out.println("UHH e"); - e.printStackTrace(); - } catch (Exception e) { - System.out.println("UHHHHHH " + e); - } - } - - - List<String> parseNames(String names) { - return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList()); - } - - List<LocatorHostPort> parseLocators(String locators) { - return Arrays.stream(locators.split(",")).map((s) -> { - String locatorString = s.trim(); - return parseLocator(locatorString); - }).collect(Collectors.toList()); - } - - private LocatorHostPort parseLocator(String locatorString) { - String[] splits = locatorString.split("\\["); - String locator = splits[0]; - int port = Integer.parseInt(splits[1].replace("]", "")); - return new LocatorHostPort(locator, port); - } - - boolean isDurable(String durableClientId) { - return !durableClientId.equals(""); - } - - String generateCqName(int taskId, String cqPrefix, String regionName) { - return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName; - } - - /** - * converts a list of regions names into a map of source partitions - * - * @param regionNames list of regionNames - * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name - */ - Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) { - return regionNames.stream().map(regionName -> { - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put(REGION_NAME, regionName); - return sourcePartition; - }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s)); - } -} diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java similarity index 99% rename from src/test/java/kafka/GeodeKafkaTestCluster.java rename to src/test/java/geode/kafka/GeodeKafkaTestCluster.java index 85c0fc4..f1a6dff 100644 --- a/src/test/java/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java @@ -1,6 +1,7 @@ -package kafka; +package geode.kafka; import kafka.admin.RackAwareMode; +import geode.kafka.source.GeodeKafkaSource; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import org.apache.geode.cache.Region; diff --git a/src/test/java/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java similarity index 97% rename from src/test/java/kafka/GeodeLocalCluster.java rename to src/test/java/geode/kafka/GeodeLocalCluster.java index 43ac8f5..afd3b9d 100644 --- a/src/test/java/kafka/GeodeLocalCluster.java +++ b/src/test/java/geode/kafka/GeodeLocalCluster.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import java.io.IOException; diff --git a/src/test/java/kafka/JavaProcess.java b/src/test/java/geode/kafka/JavaProcess.java similarity index 98% rename from src/test/java/kafka/JavaProcess.java rename to src/test/java/geode/kafka/JavaProcess.java index 30edfef..fe00094 100644 --- a/src/test/java/kafka/JavaProcess.java +++ b/src/test/java/geode/kafka/JavaProcess.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import java.io.File; import java.io.IOException; diff --git a/src/test/java/kafka/KafkaLocalCluster.java b/src/test/java/geode/kafka/KafkaLocalCluster.java similarity index 96% rename from src/test/java/kafka/KafkaLocalCluster.java rename to src/test/java/geode/kafka/KafkaLocalCluster.java index cd2a3df..57f16f4 100644 --- a/src/test/java/kafka/KafkaLocalCluster.java +++ b/src/test/java/geode/kafka/KafkaLocalCluster.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java b/src/test/java/geode/kafka/LocatorLauncherWrapper.java similarity index 98% rename from src/test/java/kafka/LocatorLauncherWrapper.java rename to src/test/java/geode/kafka/LocatorLauncherWrapper.java index c1a7075..57ff405 100644 --- a/src/test/java/kafka/LocatorLauncherWrapper.java +++ b/src/test/java/geode/kafka/LocatorLauncherWrapper.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.distributed.Locator; diff --git a/src/test/java/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java similarity index 99% rename from src/test/java/kafka/ServerLauncherWrapper.java rename to src/test/java/geode/kafka/ServerLauncherWrapper.java index 933824e..b36a3aa 100644 --- a/src/test/java/kafka/ServerLauncherWrapper.java +++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; diff --git a/src/test/java/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java similarity index 95% rename from src/test/java/kafka/WorkerAndHerderCluster.java rename to src/test/java/geode/kafka/WorkerAndHerderCluster.java index 7357232..c347946 100644 --- a/src/test/java/kafka/WorkerAndHerderCluster.java +++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import java.io.IOException; diff --git a/src/test/java/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java similarity index 89% rename from src/test/java/kafka/WorkerAndHerderWrapper.java rename to src/test/java/geode/kafka/WorkerAndHerderWrapper.java index 5f8ccd2..cc8e27b 100644 --- a/src/test/java/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java @@ -1,7 +1,6 @@ -package kafka; +package geode.kafka; -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.Locator; +import geode.kafka.source.GeodeKafkaSource; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -14,16 +13,14 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.apache.kafka.connect.util.ConnectUtils; -import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Properties; -import static kafka.GeodeConnectorConfig.REGIONS; -import static kafka.GeodeConnectorConfig.TOPICS; -import static kafka.GeodeKafkaTestCluster.TEST_REGIONS; -import static kafka.GeodeKafkaTestCluster.TEST_TOPICS; +import static geode.kafka.GeodeConnectorConfig.REGIONS; +import static geode.kafka.GeodeConnectorConfig.TOPICS; +import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS; +import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS; public class WorkerAndHerderWrapper { diff --git a/src/test/java/kafka/ZooKeeperLocalCluster.java b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java similarity index 98% rename from src/test/java/kafka/ZooKeeperLocalCluster.java rename to src/test/java/geode/kafka/ZooKeeperLocalCluster.java index 8b23f53..a3d3433 100644 --- a/src/test/java/kafka/ZooKeeperLocalCluster.java +++ b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; diff --git a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java similarity index 57% rename from src/test/java/kafka/GeodeKafkaSourceTaskTest.java rename to src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 2c15664..33c260d 100644 --- a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -1,5 +1,6 @@ -package kafka; +package geode.kafka.source; +import geode.kafka.GeodeConnectorConfig; import org.junit.Test; import java.util.Arrays; @@ -7,10 +8,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static kafka.GeodeConnectorConfig.REGION_NAME; -import static org.hamcrest.CoreMatchers.allOf; +import static geode.kafka.GeodeConnectorConfig.REGION_NAME; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; public class GeodeKafkaSourceTaskTest { @@ -37,28 +36,7 @@ public class GeodeKafkaSourceTaskTest { - @Test - public void parseRegionNamesShouldSplitOnComma() { - GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); - List<String> regionNames = task.parseNames("region1,region2,region3,region4"); - assertEquals(4, regionNames.size()); - assertThat(true, allOf(is(regionNames.contains("region1")) - , is(regionNames.contains("region2")) - , is(regionNames.contains("region3")) - , is(regionNames.contains("region4")))); - } - @Test - public void parseRegionNamesShouldChomp() { - GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); - List<String> regionNames = task.parseNames("region1, region2, region3,region4"); - assertEquals(4, regionNames.size()); - assertThat(true, allOf(is(regionNames instanceof List) - , is(regionNames.contains("region1")) - , is(regionNames.contains("region2")) - , is(regionNames.contains("region3")) - , is(regionNames.contains("region4")))); - } @Test public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() { @@ -71,13 +49,7 @@ public class GeodeKafkaSourceTaskTest { assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3"))); } - @Test - public void shouldBeAbleToParseGeodeLocatorStrings() { - GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); - String locatorString="localhost[8888], localhost[8881]"; - List<LocatorHostPort> locators = task.parseLocators(locatorString); - assertThat(2, is(locators.size())); - } + @Test public void listOfLocatorsShouldBeConfiguredIntoClientCache() { @@ -101,7 +73,6 @@ public class GeodeKafkaSourceTaskTest { props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE); props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE); - GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); task.start(props); diff --git a/src/test/java/kafka/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java similarity index 93% rename from src/test/java/kafka/GeodeKafkaSourceTest.java rename to src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java index ec6dff8..717d495 100644 --- a/src/test/java/kafka/GeodeKafkaSourceTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java @@ -1,4 +1,4 @@ -package kafka; +package geode.kafka.source; import org.junit.Test;
