This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit d5114b511795cb29638aa35e01cd40475442a935 Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 29 16:49:40 2020 -0800 Adding missing classes --- src/main/java/geode/kafka/sink/BatchRecords.java | 48 +++++++++ .../java/geode/kafka/sink/GeodeKafkaSinkTask.java | 109 +++++++++++++++++++++ .../java/geode/kafka/sink/BatchRecordsTest.java | 39 ++++++++ 3 files changed, 196 insertions(+) diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java b/src/main/java/geode/kafka/sink/BatchRecords.java new file mode 100644 index 0000000..742dcbc --- /dev/null +++ b/src/main/java/geode/kafka/sink/BatchRecords.java @@ -0,0 +1,48 @@ +package geode.kafka.sink; + +import org.apache.geode.cache.Region; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A collection of records to put/remove from a region + */ +public class BatchRecords { + + private Map updateMap = new HashMap(); + private List removeList = new ArrayList(); + + public void addRemoveOperation(SinkRecord record) { + //if a previous operation added to the update map + //let's just remove it so we don't do a put and then a remove + //depending on the order of operations (putAll then removeAll or removeAll or putAll)... + //...we could remove one of the if statements. + if (updateMap.containsKey(record.key())) { + updateMap.remove(record.key()); + } else { + removeList.add(record.key()); + } + } + + public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) { + //it's assumed the records in are order + //if so if a previous value was in the remove list + // let's not remove it at the end of this operation + if (nullValuesMeansRemove) { + if (removeList.contains(record.key())) { + removeList.remove(record.key()); + } + } + updateMap.put(record.key(), record.value()); + } + + + public void executeOperations(Region region) { + region.putAll(updateMap); + region.removeAll(removeList); + } +} diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java new file mode 100644 index 0000000..203192c --- /dev/null +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -0,0 +1,109 @@ +package geode.kafka.sink; + +import geode.kafka.GeodeConnectorConfig; +import geode.kafka.GeodeContext; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * TODO javaDoc + * Currently force 1 region per task + */ +public class GeodeKafkaSinkTask extends SinkTask { + + private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class); + + GeodeContext geodeContext; + Map<String, List<String>> topicToRegions; + Map<String, Region> regionNameToRegion; + boolean nullValuesMeansRemove = true; + + /** + * {@inheritDoc} + */ + @Override + public String version() { + //TODO + return "unknown"; + } + + @Override + public void start(Map<String, String> props) { + try { + GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props); + logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); + geodeContext = new GeodeContext(geodeConnectorConfig); + topicToRegions = geodeConnectorConfig.getTopicToRegions(); + regionNameToRegion = createProxyRegions(topicToRegions.values()); + } catch (Exception e) { + e.printStackTrace(); + logger.error("Unable to start sink task", e); + throw e; + } + } + + @Override + public void put(Collection<SinkRecord> records) { + //spin off a new thread to handle this operation? Downside is ordering and retries... + Map<String, BatchRecords> batchRecordsMap = new HashMap<>(); + for (SinkRecord record : records) { + updateRegionsByTopic(record, batchRecordsMap); + } + batchRecordsMap.entrySet().stream().forEach((entry) -> { + String region = entry.getKey(); + BatchRecords batchRecords = entry.getValue(); + batchRecords.executeOperations(regionNameToRegion.get(region)); + }); + } + + private void updateRegionsByTopic(SinkRecord sinkRecord, Map<String, BatchRecords> batchRecordsMap) { + Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic()); + for (String region : regionsToUpdate) { + updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region); + } + } + + private void updateBatchRecordsForRecord(SinkRecord record, Map<String, BatchRecords> batchRecordsMap, String region) { + BatchRecords batchRecords = batchRecordsMap.get(region); + if (batchRecords == null) { + batchRecords = new BatchRecords(); + batchRecordsMap.put(region, batchRecords); + } + if (record.key() != null) { + if (record.value() == null && nullValuesMeansRemove) { + batchRecords.addRemoveOperation(record); + } else { + batchRecords.addUpdateOperation(record, nullValuesMeansRemove); + } + } else { + //Invest in configurable auto key generator? + logger.warn("Unable to push to Geode, missing key in record : " + record.value()); + } + } + + private Map<String, Region> createProxyRegions(Collection<List<String>> regionNames) { + List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList()); + return flat.stream().map(regionName -> createProxyRegion(regionName)).collect(Collectors.toMap(region->region.getName(), region -> region)); + } + + private Region createProxyRegion(String regionName) { + return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + } + + @Override + public void stop() { + geodeContext.getClientCache().close(false); + } + +} \ No newline at end of file diff --git a/src/test/java/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/geode/kafka/sink/BatchRecordsTest.java new file mode 100644 index 0000000..4907d74 --- /dev/null +++ b/src/test/java/geode/kafka/sink/BatchRecordsTest.java @@ -0,0 +1,39 @@ +package geode.kafka.sink; + +import org.junit.Test; + +public class BatchRecordsTest { + @Test + public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() { + + } + + @Test + public void updatingARecordShouldAddToTheUpdateMap() { + + } + + @Test + public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() { + + } + + + @Test + public void removingARecordShouldRemoveFromTheUpdateMap() { + + } + + @Test + public void removingARecordAddToTheRemoveCollection() { + + } + + @Test + public void executeOperationsShouldInvokePutAllAndRemoveAll() { + + } + + + +}
