This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit bdff4a20bb24af20623055176e60a2f1b1c2a1e0 Author: Jason Huynh <[email protected]> AuthorDate: Thu Jan 30 14:43:03 2020 -0800 Seperated out Configuration into source and sink where appropriate Added a few more tests --- build.gradle | 2 +- settings.gradle | 3 +- .../java/geode/kafka/GeodeConnectorConfig.java | 104 +++------------------ src/main/java/geode/kafka/GeodeContext.java | 36 +++++-- src/main/java/geode/kafka/sink/BatchRecords.java | 15 ++- src/main/java/geode/kafka/sink/GeodeKafkaSink.java | 22 +---- .../java/geode/kafka/sink/GeodeKafkaSinkTask.java | 18 ++-- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 35 +++++++ .../java/geode/kafka/source/GeodeKafkaSource.java | 28 +++--- .../geode/kafka/source/GeodeKafkaSourceTask.java | 27 +++--- .../kafka/source/GeodeSourceConnectorConfig.java | 104 +++++++++++++++++++++ .../java/geode/kafka/GeodeConnectorConfigTest.java | 15 +-- .../java/geode/kafka/WorkerAndHerderWrapper.java | 10 +- .../java/geode/kafka/sink/BatchRecordsTest.java | 59 ++++++++++-- .../geode/kafka/sink/GeodeKafkaSinkTaskTest.java | 4 + .../java/geode/kafka/sink/GeodeKafkaSinkTest.java | 16 ++++ .../kafka/source/GeodeKafkaSourceTaskTest.java | 13 ++- .../source/GeodeSourceConnectorConfigTest.java | 26 ++++++ 18 files changed, 350 insertions(+), 187 deletions(-) diff --git a/build.gradle b/build.gradle index a131d22..114b3cd 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,6 @@ dependencies { compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0' - testCompile(group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1') testCompile(group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '1.1.0') testCompile(group: 'org.apache.curator', name: 'curator-framework', version: '4.2.0') @@ -31,3 +30,4 @@ dependencies { testImplementation 'org.awaitility:awaitility:4.0.2' } + diff --git a/settings.gradle b/settings.gradle index b5a2326..48eed3b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1 @@ -rootProject.name = 'geode-kafka-connector' - +rootProject.name = 'geode-kafka-connector' \ No newline at end of file diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java index dcc479e..396d428 100644 --- a/src/main/java/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java @@ -8,79 +8,26 @@ import java.util.stream.Collectors; public class GeodeConnectorConfig { - //Geode Configuration - public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId"; - public static final String DEFAULT_DURABLE_CLIENT_ID = ""; - public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; - public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; - //GeodeKafka Specific Configuration + /** + * Identifier for each task + */ public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task - - public static final String CQ_PREFIX = "cqPrefix"; - public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; /** * Specifies which Locators to connect to Apache Geode */ public static final String LOCATORS = "locators"; public static final String DEFAULT_LOCATOR = "localhost[10334]"; + protected final int taskId; + protected List<LocatorHostPort> locatorHostPorts; - /** - * Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration - * Only used in sink configuration - */ - public static final String TOPICS = "topics"; - - //Used by sink - public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion"; - - //Used by source - public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic"; - - /** - * Property to describe the Source Partition in a record - */ - public static final String REGION_NAME = "regionName"; //used for Source Partition Events - - public static final String BATCH_SIZE = "geodeConnectorBatchSize"; - public static final String DEFAULT_BATCH_SIZE = "100"; - - public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; - public static final String DEFAULT_QUEUE_SIZE = "100000"; - - public static final String LOAD_ENTIRE_REGION = "loadEntireRegion"; - public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; - - - private final int taskId; - private final String durableClientId; - private final String durableClientIdPrefix; - private final String durableClientTimeout; - - private Map<String, List<String>> regionToTopics; - private Map<String, List<String>> topicToRegions; - private List<LocatorHostPort> locatorHostPorts; - - //just for tests - GeodeConnectorConfig() { + protected GeodeConnectorConfig() { taskId = 0; - durableClientId = ""; - durableClientIdPrefix = ""; - durableClientTimeout = "0"; } public GeodeConnectorConfig(Map<String, String> connectorProperties) { taskId = Integer.parseInt(connectorProperties.get(TASK_ID)); - durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX); - if (isDurable(durableClientIdPrefix)) { - durableClientId = durableClientIdPrefix + taskId; - } else { - durableClientId = ""; - } - durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT); - regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS)); - topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS)); locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS)); } @@ -105,7 +52,7 @@ public class GeodeConnectorConfig { return bindings.stream().map(binding -> { String[] regionToTopicsArray = parseBinding(binding); return regionToTopicsArray; - }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1]))); + }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1]))); } public static List<String> parseBindings(String bindings) { @@ -122,8 +69,12 @@ public class GeodeConnectorConfig { } //Used to parse a string of topics or regions - public static List<String> parseNames(String names) { - return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList()); + public static List<String> parseStringByComma(String string) { + return parseStringBy(string, ","); + } + + public static List<String> parseStringBy(String string, String regex) { + return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList()); } public static String reconstructString(Collection<String> strings) { @@ -144,40 +95,11 @@ public class GeodeConnectorConfig { return new LocatorHostPort(locator, port); } - public boolean isDurable() { - return isDurable(durableClientId); - } - - /** - * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value - * @return - */ - boolean isDurable(String durableClientId) { - return !durableClientId.equals(""); - } - public int getTaskId() { return taskId; } - public String getDurableClientId() { - return durableClientId; - } - - public String getDurableClientTimeout() { - return durableClientTimeout; - } - public List<LocatorHostPort> getLocatorHostPorts() { return locatorHostPorts; } - - public Map<String, List<String>> getRegionToTopics() { - return regionToTopics; - } - - public Map<String, List<String>> getTopicToRegions() { - return topicToRegions; - } - } diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java index d1fd3ae..b7c3d27 100644 --- a/src/main/java/geode/kafka/GeodeContext.java +++ b/src/main/java/geode/kafka/GeodeContext.java @@ -2,6 +2,8 @@ package geode.kafka; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqException; import org.apache.geode.cache.query.CqExistsException; @@ -17,20 +19,42 @@ public class GeodeContext { private ClientCache clientCache; - public GeodeContext(GeodeConnectorConfig connectorConfig) { - clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout()); + public GeodeContext() { + } + + public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout) { + clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout); + return clientCache; + } + + public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList) { + clientCache = createClientCache(locatorHostPortList, "", ""); + return clientCache; } public ClientCache getClientCache() { return clientCache; } + /** + * + * @param locators + * @param durableClientName + * @param durableClientTimeOut + * @return + */ public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) { - ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName) - .set("durable-client-timeout", durableClientTimeOut) - .setPoolSubscriptionEnabled(true); + ClientCacheFactory ccf = new ClientCacheFactory(); + if (!durableClientName.equals("")) { + ccf.set("durable-client-id", durableClientName) + .set("durable-client-timeout", durableClientTimeOut); + } + //currently we only allow using the default pool. + //If we ever want to allow adding multiple pools we'll have to configure pool factories + ccf.setPoolSubscriptionEnabled(true); + for (LocatorHostPort locator: locators) { - ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create(); + ccf.addPoolLocator(locator.getHostName(), locator.getPort()); } return ccf.create(); } diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java b/src/main/java/geode/kafka/sink/BatchRecords.java index 742dcbc..282ba1c 100644 --- a/src/main/java/geode/kafka/sink/BatchRecords.java +++ b/src/main/java/geode/kafka/sink/BatchRecords.java @@ -4,6 +4,7 @@ import org.apache.geode.cache.Region; import org.apache.kafka.connect.sink.SinkRecord; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -13,8 +14,18 @@ import java.util.Map; */ public class BatchRecords { - private Map updateMap = new HashMap(); - private List removeList = new ArrayList(); + private Map updateMap; + private Collection removeList; + + public BatchRecords() { + this(new HashMap(), new ArrayList()); + } + + /** Used for tests**/ + public BatchRecords(Map updateMap, Collection removeList) { + this.updateMap = updateMap; + this.removeList = removeList; + } public void addRemoveOperation(SinkRecord record) { //if a previous operation added to the update map diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java index 99f9b9d..43f8eab 100644 --- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java @@ -11,19 +11,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; -import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; import static geode.kafka.GeodeConnectorConfig.LOCATORS; -import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS; +import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE; +import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE; +import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS; public class GeodeKafkaSink extends SinkConnector { private static final ConfigDef CONFIG_DEF = new ConfigDef(); @@ -44,7 +36,7 @@ public class GeodeKafkaSink extends SinkConnector { List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(sharedProps); - List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(TOPIC_TO_REGION_BINDINGS)); + List<String> bindings = GeodeConnectorConfig.parseStringByComma(taskProps.get(TOPIC_TO_REGION_BINDINGS)); List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks); for (int i = 0; i < maxTasks; i++) { @@ -75,11 +67,7 @@ public class GeodeKafkaSink extends SinkConnector { private Map<String, String> computeMissingConfigurations(Map<String, String> props) { props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR); - props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT); - props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID); - props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE); - props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE); - props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX); + props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE); return props; } } diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java index 203192c..7f21134 100644 --- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -1,7 +1,7 @@ package geode.kafka.sink; -import geode.kafka.GeodeConnectorConfig; import geode.kafka.GeodeContext; +import geode.kafka.GeodeSinkConnectorConfig; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.kafka.connect.sink.SinkRecord; @@ -9,13 +9,13 @@ import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; + /** * TODO javaDoc * Currently force 1 region per task @@ -24,10 +24,10 @@ public class GeodeKafkaSinkTask extends SinkTask { private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class); - GeodeContext geodeContext; - Map<String, List<String>> topicToRegions; - Map<String, Region> regionNameToRegion; - boolean nullValuesMeansRemove = true; + private GeodeContext geodeContext; + private Map<String, List<String>> topicToRegions; + private Map<String, Region> regionNameToRegion; + private boolean nullValuesMeansRemove = true; /** * {@inheritDoc} @@ -41,11 +41,13 @@ public class GeodeKafkaSinkTask extends SinkTask { @Override public void start(Map<String, String> props) { try { - GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props); + GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); - geodeContext = new GeodeContext(geodeConnectorConfig); + geodeContext = new GeodeContext(); + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts()); topicToRegions = geodeConnectorConfig.getTopicToRegions(); regionNameToRegion = createProxyRegions(topicToRegions.values()); + nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove(); } catch (Exception e) { e.printStackTrace(); logger.error("Unable to start sink task", e); diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java new file mode 100644 index 0000000..7c6aa3e --- /dev/null +++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -0,0 +1,35 @@ +package geode.kafka; + +import java.util.List; +import java.util.Map; + +public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { + //Used by sink + public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion"; + public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove"; + public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true"; + + private Map<String, List<String>> topicToRegions; + private final boolean nullValuesMeanRemove; + + //just for tests + GeodeSinkConnectorConfig() { + super(); + nullValuesMeanRemove = Boolean.parseBoolean(DEFAULT_NULL_VALUES_MEAN_REMOVE); + } + + public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) { + super(connectorProperties); + topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS)); + nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE)); + } + + public Map<String, List<String>> getTopicToRegions() { + return topicToRegions; + } + + public boolean getNullValuesMeanRemove() { + return nullValuesMeanRemove; + } + +} diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java index 91e6203..686359e 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java @@ -12,21 +12,21 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; -import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; -import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.LOCATORS; -import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS; +import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE; +import static geode.kafka.source.GeodeSourceConnectorConfig.CQ_PREFIX; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE; +import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static geode.kafka.source.GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION; +import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE; +import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; public class GeodeKafkaSource extends SourceConnector { @@ -47,7 +47,7 @@ public class GeodeKafkaSource extends SourceConnector { Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(sharedProps); - List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(REGION_TO_TOPIC_BINDINGS)); + List<String> bindings = GeodeConnectorConfig.parseStringByComma(taskProps.get(REGION_TO_TOPIC_BINDINGS)); List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks); for (int i = 0; i < maxTasks; i++) { diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java index dadc8ba..c983a51 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -19,12 +19,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; -import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; -import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; -import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; -import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.REGION_NAME; +import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE; +import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE; +import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME; public class GeodeKafkaSourceTask extends SourceTask { @@ -37,7 +34,7 @@ public class GeodeKafkaSourceTask extends SourceTask { private static final Map<String, Long> OFFSET_DEFAULT = createOffset(); private GeodeContext geodeContext; - private GeodeConnectorConfig geodeConnectorConfig; + private GeodeSourceConnectorConfig geodeConnectorConfig; private Map<String, List<String>> regionToTopics; private Map<String, Map<String, String>> sourcePartitions; private BlockingQueue<GeodeEvent> eventBuffer; @@ -58,9 +55,11 @@ public class GeodeKafkaSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { try { - geodeConnectorConfig = new GeodeConnectorConfig(props); + System.out.println("JASON start task"); + geodeConnectorConfig = new GeodeSourceConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); - geodeContext = new GeodeContext(geodeConnectorConfig); + geodeContext = new GeodeContext(); + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout()); batchSize = Integer.parseInt(props.get(BATCH_SIZE)); int queueSize = Integer.parseInt(props.get(QUEUE_SIZE)); @@ -69,14 +68,18 @@ public class GeodeKafkaSourceTask extends SourceTask { regionToTopics = geodeConnectorConfig.getRegionToTopics(); sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet()); - String cqPrefix = props.get(CQ_PREFIX); - boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION)); + String cqPrefix = geodeConnectorConfig.getCqPrefix(); + boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion(); installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion); } catch (Exception e) { + System.out.println("JASON start task failed" + e); + e.printStackTrace(); logger.error("Unable to start source task", e); throw e; } + System.out.println("JASON end task"); + } @Override @@ -102,7 +105,7 @@ public class GeodeKafkaSourceTask extends SourceTask { geodeContext.getClientCache().close(true); } - void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) { + void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) { boolean isDurable = geodeConnectorConfig.isDurable(); int taskId = geodeConnectorConfig.getTaskId(); for (String region : geodeConnectorConfig.getRegionToTopics().keySet()) { diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java new file mode 100644 index 0000000..c29048f --- /dev/null +++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -0,0 +1,104 @@ +package geode.kafka.source; + +import geode.kafka.GeodeConnectorConfig; +import geode.kafka.LocatorHostPort; + +import java.util.List; +import java.util.Map; + +public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { + + //Geode Configuration + public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId"; + public static final String DEFAULT_DURABLE_CLIENT_ID = ""; + public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; + public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; + + public static final String CQ_PREFIX = "cqPrefix"; + public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; + + /** + * Used as a key for source partitions + */ + public static final String REGION = "region"; + + public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic"; + + public static final String BATCH_SIZE = "geodeConnectorBatchSize"; + public static final String DEFAULT_BATCH_SIZE = "100"; + + public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; + public static final String DEFAULT_QUEUE_SIZE = "100000"; + + public static final String LOAD_ENTIRE_REGION = "loadEntireRegion"; + public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; + + private final String durableClientId; + private final String durableClientIdPrefix; + private final String durableClientTimeout; + private final String cqPrefix; + private final boolean loadEntireRegion; + + private Map<String, List<String>> regionToTopics; + + //just for tests + protected GeodeSourceConnectorConfig() { + super(); + durableClientId = ""; + durableClientIdPrefix = ""; + durableClientTimeout = "0"; + cqPrefix = DEFAULT_CQ_PREFIX; + loadEntireRegion = Boolean.parseBoolean(DEFAULT_LOAD_ENTIRE_REGION); + } + + public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) { + super(connectorProperties); + regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS)); + durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX); + if (isDurable(durableClientIdPrefix)) { + durableClientId = durableClientIdPrefix + taskId; + } else { + durableClientId = ""; + } + durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT); + cqPrefix = connectorProperties.get(CQ_PREFIX); + loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION)); + } + + public boolean isDurable() { + return isDurable(durableClientId); + } + + /** + * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value + * @return + */ + boolean isDurable(String durableClientId) { + return !durableClientId.equals(""); + } + + public int getTaskId() { + return taskId; + } + + public String getDurableClientId() { + return durableClientId; + } + + public String getDurableClientTimeout() { + return durableClientTimeout; + } + + public String getCqPrefix() { + return cqPrefix; + } + + public boolean getLoadEntireRegion() { + return loadEntireRegion; + } + + public Map<String, List<String>> getRegionToTopics() { + return regionToTopics; + } + +} diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java index 6a39c5d..904b981 100644 --- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java +++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java @@ -10,8 +10,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; import static geode.kafka.GeodeConnectorConfig.LOCATORS; import static geode.kafka.GeodeConnectorConfig.TASK_ID; import static org.hamcrest.CoreMatchers.allOf; @@ -26,7 +24,7 @@ public class GeodeConnectorConfigTest { @Test public void parseRegionNamesShouldSplitOnComma() { GeodeConnectorConfig config = new GeodeConnectorConfig(); - List<String> regionNames = config.parseNames("region1,region2,region3,region4"); + List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4"); assertEquals(4, regionNames.size()); assertThat(true, allOf(is(regionNames.contains("region1")) , is(regionNames.contains("region2")) @@ -37,7 +35,7 @@ public class GeodeConnectorConfigTest { @Test public void parseRegionNamesShouldChomp() { GeodeConnectorConfig config = new GeodeConnectorConfig(); - List<String> regionNames = config.parseNames("region1, region2, region3,region4"); + List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4"); assertEquals(4, regionNames.size()); assertThat(true, allOf(is(regionNames instanceof List) , is(regionNames.contains("region1")) @@ -135,14 +133,5 @@ public class GeodeConnectorConfigTest { */ - @Test - public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() { - Map<String, String> props = new HashMap<>(); - props.put(TASK_ID, "0"); - props.put(DURABLE_CLIENT_ID_PREFIX, ""); - props.put(LOCATORS, "localhost[10334]"); - GeodeConnectorConfig config = new GeodeConnectorConfig(props); - assertEquals("", config.getDurableClientId()); - } } diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java index 5e8f074..1b46fe0 100644 --- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java @@ -18,12 +18,11 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS; -import static geode.kafka.GeodeConnectorConfig.TOPICS; -import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS; import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS; import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS; +import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS; +import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; public class WorkerAndHerderWrapper { @@ -34,9 +33,6 @@ public class WorkerAndHerderWrapper { // fast flushing for testing. props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); - - props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); props.put("internal.key.converter.schemas.enable", "false"); props.put("internal.value.converter.schemas.enable", "false"); props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); @@ -71,7 +67,7 @@ public class WorkerAndHerderWrapper { sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector"); sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS); - sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK); + sinkProps.put("topics", TEST_TOPIC_FOR_SINK); herder.putConnectorConfig( sinkProps.get(ConnectorConfig.NAME_CONFIG), diff --git a/src/test/java/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/geode/kafka/sink/BatchRecordsTest.java index 4907d74..593fea0 100644 --- a/src/test/java/geode/kafka/sink/BatchRecordsTest.java +++ b/src/test/java/geode/kafka/sink/BatchRecordsTest.java @@ -1,37 +1,82 @@ package geode.kafka.sink; +import org.apache.geode.cache.Region; +import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; +import java.util.Collection; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class BatchRecordsTest { @Test public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() { - + Map updates = mock(Map.class); + Collection removes = mock(Collection.class); + when(removes.contains(any())).thenReturn(true); + BatchRecords records = new BatchRecords(updates, removes); + SinkRecord sinkRecord = mock(SinkRecord.class); + records.addUpdateOperation(sinkRecord, true); + verify(removes, times(1)).remove(any()); } @Test public void updatingARecordShouldAddToTheUpdateMap() { - + Map updates = mock(Map.class); + Collection removes = mock(Collection.class); + when(removes.contains(any())).thenReturn(false); + BatchRecords records = new BatchRecords(updates, removes); + SinkRecord sinkRecord = mock(SinkRecord.class); + records.addUpdateOperation(sinkRecord, true); + verify(updates, times(1)).put(any(), any()); } @Test public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() { - + boolean nullValuesMeanRemove = false; + Map updates = mock(Map.class); + Collection removes = mock(Collection.class); + when(removes.contains(any())).thenReturn(true); + BatchRecords records = new BatchRecords(updates, removes); + SinkRecord sinkRecord = mock(SinkRecord.class); + records.addUpdateOperation(sinkRecord, nullValuesMeanRemove); + verify(removes, times(0)).remove(any()); } @Test - public void removingARecordShouldRemoveFromTheUpdateMap() { - + public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() { + Map updates = mock(Map.class); + Collection removes = mock(Collection.class); + when(updates.containsKey(any())).thenReturn(true); + BatchRecords records = new BatchRecords(updates, removes); + SinkRecord sinkRecord = mock(SinkRecord.class); + records.addRemoveOperation(sinkRecord); + verify(updates, times(1)).remove(any()); } @Test public void removingARecordAddToTheRemoveCollection() { - + Map updates = mock(Map.class); + Collection removes = mock(Collection.class); + BatchRecords records = new BatchRecords(updates, removes); + SinkRecord sinkRecord = mock(SinkRecord.class); + records.addRemoveOperation(sinkRecord); + verify(removes, times(1)).add(any()); } @Test public void executeOperationsShouldInvokePutAllAndRemoveAll() { - + Region region = mock(Region.class); + BatchRecords records = new BatchRecords(); + records.executeOperations(region); + verify(region, times(1)).putAll(any()); + verify(region, times(1)).removeAll(any()); } diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java new file mode 100644 index 0000000..37131ba --- /dev/null +++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java @@ -0,0 +1,4 @@ +package geode.kafka.sink; + +public class GeodeKafkaSinkTaskTest { +} diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java new file mode 100644 index 0000000..2720e90 --- /dev/null +++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java @@ -0,0 +1,16 @@ +package geode.kafka.sink; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class GeodeKafkaSinkTest { + + @Test + public void test() { + GeodeKafkaSink sink = new GeodeKafkaSink(); + Map<String, String> props = new HashMap(); + sink.start(props); + } +} diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java index d4149db..b793d30 100644 --- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -1,6 +1,5 @@ package geode.kafka.source; -import geode.kafka.GeodeConnectorConfig; import geode.kafka.GeodeContext; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.query.CqEvent; @@ -15,8 +14,8 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.REGION_NAME; +import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX; +import static org.apache.geode.pdx.internal.PeerTypeRegistration.REGION_NAME; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -88,7 +87,7 @@ public class GeodeKafkaSourceTaskTest { GeodeContext geodeContext = mock(GeodeContext.class); when(geodeContext.getClientCache()).thenReturn(clientCache); - GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class); when (config.isDurable()).thenReturn(true); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); task.installOnGeode(config, geodeContext, null, "", false); @@ -105,7 +104,7 @@ public class GeodeKafkaSourceTaskTest { Map<String, List<String>> regionToTopicsMap = new HashMap<>(); regionToTopicsMap.put("region1", new ArrayList()); - GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class); when (config.getRegionToTopics()).thenReturn(regionToTopicsMap); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); @@ -123,7 +122,7 @@ public class GeodeKafkaSourceTaskTest { Map<String, List<String>> regionToTopicsMap = new HashMap<>(); regionToTopicsMap.put("region1", new ArrayList()); - GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class); when (config.getRegionToTopics()).thenReturn(regionToTopicsMap); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); @@ -138,7 +137,7 @@ public class GeodeKafkaSourceTaskTest { GeodeContext geodeContext = mock(GeodeContext.class); when(geodeContext.getClientCache()).thenReturn(clientCache); - GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class); when (config.isDurable()).thenReturn(false); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); task.installOnGeode(config, geodeContext, null, "", false); diff --git a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java new file mode 100644 index 0000000..629c07d --- /dev/null +++ b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java @@ -0,0 +1,26 @@ +package geode.kafka.source; + +import geode.kafka.GeodeConnectorConfig; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static geode.kafka.GeodeConnectorConfig.LOCATORS; +import static geode.kafka.GeodeConnectorConfig.TASK_ID; +import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static org.junit.Assert.assertEquals; + +public class GeodeSourceConnectorConfigTest { + + @Test + public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() { + Map<String, String> props = new HashMap<>(); + props.put(TASK_ID, "0"); + props.put(DURABLE_CLIENT_ID_PREFIX, ""); + props.put(LOCATORS, "localhost[10334]"); + GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props); + assertEquals("", config.getDurableClientId()); + } + +}
