This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 9746583899c196a48f5ecdc35955715ac239e440 Author: Jason Huynh <[email protected]> AuthorDate: Mon Jan 27 16:25:42 2020 -0800 Split regions and topics per task. --- build.gradle | 3 +- .../java/geode/kafka/GeodeConnectorConfig.java | 7 +++- src/main/java/geode/kafka/sink/GeodeKafkaSink.java | 12 ++++++- .../java/geode/kafka/source/GeodeKafkaSource.java | 12 ++++++- .../geode/kafka/source/GeodeKafkaSourceTask.java | 13 +++++-- .../kafka/source/GeodeKafkaSourceTaskTest.java | 42 ++++++++++++++++++---- 6 files changed, 74 insertions(+), 15 deletions(-) diff --git a/build.gradle b/build.gradle index 87a2c36..515967d 100644 --- a/build.gradle +++ b/build.gradle @@ -26,8 +26,7 @@ dependencies { testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: '2.3.1') testCompile group: 'junit', name: 'junit', version: '4.12' + testCompile 'org.mockito:mockito-core:3.2.4' testImplementation 'org.awaitility:awaitility:4.0.2' - - } diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java index 8e79e1f..11eff62 100644 --- a/src/main/java/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java @@ -1,6 +1,7 @@ package geode.kafka; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -80,10 +81,14 @@ public class GeodeConnectorConfig { locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS)); } - List<String> parseNames(String names) { + public static List<String> parseNames(String names) { return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList()); } + public static String reconstructString(Collection<String> strings) { + return strings.stream().collect(Collectors.joining(",")); + } + List<LocatorHostPort> parseLocators(String locators) { return Arrays.stream(locators.split(",")).map((s) -> { String locatorString = s.trim(); diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java index 08af2ad..c7afb96 100644 --- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java @@ -4,6 +4,7 @@ import geode.kafka.GeodeConnectorConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.util.ConnectorUtils; import java.util.ArrayList; import java.util.HashMap; @@ -22,6 +23,8 @@ import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; import static geode.kafka.GeodeConnectorConfig.LOCATORS; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; +import static geode.kafka.GeodeConnectorConfig.REGIONS; +import static geode.kafka.GeodeConnectorConfig.TOPICS; public class GeodeKafkaSink extends SinkConnector { private static final ConfigDef CONFIG_DEF = new ConfigDef(); @@ -41,11 +44,18 @@ public class GeodeKafkaSink extends SinkConnector { public List<Map<String, String>> taskConfigs(int maxTasks) { List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String, String> taskProps = new HashMap<>(); - taskProps.putAll(sharedProps); + List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS)); + List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks); + + List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS)); + List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks); + for (int i = 0; i < maxTasks; i++) { taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i); + taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i))); + taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i))); taskConfigs.add(taskProps); } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java index 8f60471..ca3f2c8 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java @@ -5,6 +5,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.util.ConnectorUtils; import java.util.ArrayList; import java.util.HashMap; @@ -25,6 +26,8 @@ import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.LOCATORS; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; +import static geode.kafka.GeodeConnectorConfig.REGIONS; +import static geode.kafka.GeodeConnectorConfig.TOPICS; public class GeodeKafkaSource extends SourceConnector { @@ -45,9 +48,16 @@ public class GeodeKafkaSource extends SourceConnector { Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(sharedProps); + List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS)); + List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks); + + List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS)); + List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks); + for (int i = 0; i < maxTasks; i++) { - //TODO partition regions and topics taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i); + taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i))); + taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i))); taskConfigs.add(taskProps); } return taskConfigs; diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java index 657b274..8080179 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -21,6 +21,7 @@ import java.util.stream.Collectors; import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; import static geode.kafka.GeodeConnectorConfig.REGION_NAME; @@ -39,7 +40,7 @@ public class GeodeKafkaSourceTask extends SourceTask { private GeodeConnectorConfig geodeConnectorConfig; private List<String> topics; private Map<String, Map<String, String>> sourcePartitions; - private static BlockingQueue<GeodeEvent> eventBuffer; + private BlockingQueue<GeodeEvent> eventBuffer; private int batchSize; @@ -54,6 +55,12 @@ public class GeodeKafkaSourceTask extends SourceTask { return null; } + void startForTesting(BlockingQueue eventBuffer, List<String> topics, int batchSize) { + this.eventBuffer = eventBuffer; + this.topics = topics; + this.batchSize = batchSize; + } + @Override public void start(Map<String, String> props) { try { @@ -111,7 +118,7 @@ public class GeodeKafkaSourceTask extends SourceTask { } } - void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) { + GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) { CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName); cqAttributesFactory.addCqListener(listener); @@ -129,6 +136,7 @@ public class GeodeKafkaSourceTask extends SourceTask { finally { listener.signalInitialResultsLoaded(); } + return listener; } /** @@ -148,5 +156,4 @@ public class GeodeKafkaSourceTask extends SourceTask { String generateCqName(int taskId, String cqPrefix, String regionName) { return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName; } - } diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 8f86cbc..041adc8 100644 --- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -3,8 +3,10 @@ package geode.kafka.source; import geode.kafka.GeodeConnectorConfig; import geode.kafka.GeodeContext; import org.apache.geode.cache.query.CqEvent; +import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -13,8 +15,15 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static geode.kafka.GeodeConnectorConfig.LOCATORS; +import static geode.kafka.GeodeConnectorConfig.REGIONS; import static geode.kafka.GeodeConnectorConfig.REGION_NAME; +import static geode.kafka.GeodeConnectorConfig.TASK_ID; +import static geode.kafka.GeodeConnectorConfig.TOPICS; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -24,6 +33,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + public class GeodeKafkaSourceTaskTest { @@ -62,16 +72,36 @@ public class GeodeKafkaSourceTaskTest { } @Test - public void cqListenerOnEventPopulatesEventsBuffer() {} + public void cqListenerOnEventPopulatesEventsBuffer() { + GeodeContext geodeContext = mock(GeodeContext.class); + BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); + boolean loadEntireRegion = false; + boolean isDurable = false; - @Test - public void pollReturnsEventsWhenEventBufferHasValues() { + when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList()); + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable); + listener.onEvent(mock(CqEvent.class)); + assertEquals(1, eventBuffer.size()); } @Test - public void regionsArePassedCorrectlyToTask() { - + public void pollReturnsEventsWhenEventBufferHasValues() throws Exception { +// BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); +// CqEvent cqEvent = mock(CqEvent.class); +// when(cqEvent.getNewValue()).thenReturn("New Value"); +// GeodeEvent event = mock(GeodeEvent.class); +// when(event.getEvent()).thenReturn(cqEvent); +// eventBuffer.add(event); +// +// List<String> topics = new ArrayList<>(); +// topics.add("myTopic"); +// +// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); +// task.startForTesting(eventBuffer, topics, 1); +// List<SourceRecord> records = task.poll(); +// assertEquals(1, records.size()); } @Test @@ -94,8 +124,6 @@ public class GeodeKafkaSourceTaskTest { assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3"))); } - - @Test public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
