This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 1258cfc80c35a1c505bef05d4cf315671bf37333 Author: Jason Huynh <[email protected]> AuthorDate: Fri Jan 24 22:21:03 2020 -0800 Added loading entire region through initial results --- .../java/geode/kafka/GeodeConnectorConfig.java | 3 +- src/main/java/geode/kafka/GeodeContext.java | 11 ++++- .../java/geode/kafka/source/GeodeKafkaSource.java | 3 ++ .../kafka/source/GeodeKafkaSourceListener.java | 9 +++++ .../geode/kafka/source/GeodeKafkaSourceTask.java | 35 +++++++++++----- .../java/geode/kafka/GeodeConnectorConfigTest.java | 14 +++++++ .../kafka/source/GeodeKafkaSourceTaskTest.java | 47 +++++++++++++++++++++- .../geode/kafka/source/GeodeKafkaSourceTest.java | 11 +---- 8 files changed, 111 insertions(+), 22 deletions(-) diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java index 4f75ec0..8e79e1f 100644 --- a/src/main/java/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java @@ -13,7 +13,6 @@ public class GeodeConnectorConfig { public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; - //GeodeKafka Specific Configuration public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task @@ -46,6 +45,8 @@ public class GeodeConnectorConfig { public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; public static final String DEFAULT_QUEUE_SIZE = "100000"; + public static final String LOAD_ENTIRE_REGION = "loadEntireRegion"; + public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; private final int taskId; diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java index 4582b93..d1fd3ae 100644 --- a/src/main/java/geode/kafka/GeodeContext.java +++ b/src/main/java/geode/kafka/GeodeContext.java @@ -9,6 +9,7 @@ import org.apache.geode.cache.query.CqQuery; import org.apache.geode.cache.query.RegionNotFoundException; import org.apache.kafka.connect.errors.ConnectException; +import java.util.Collection; import java.util.List; public class GeodeContext { @@ -40,7 +41,15 @@ public class GeodeContext { cq.execute(); return cq; } catch (RegionNotFoundException | CqException | CqExistsException e) { - e.printStackTrace(); + throw new ConnectException(e); + } + } + + public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException { + try { + CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable); + return cq.executeWithInitialResults(); + } catch (RegionNotFoundException | CqException | CqExistsException e) { throw new ConnectException(e); } } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java index 36b9942..8f60471 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java @@ -17,10 +17,12 @@ import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.LOCATORS; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; @@ -69,6 +71,7 @@ public class GeodeKafkaSource extends SourceConnector { props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE); props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE); props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX); + props.computeIfAbsent(LOAD_ENTIRE_REGION, (key) -> DEFAULT_LOAD_ENTIRE_REGION); return props; } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java index 019bb5a..8ae0045 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java @@ -14,14 +14,19 @@ class GeodeKafkaSourceListener implements CqStatusListener { public String regionName; private BlockingQueue<GeodeEvent> eventBuffer; + private boolean initialResultsLoaded; public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) { this.eventBuffer = eventBuffer; this.regionName = regionName; + initialResultsLoaded = false; } @Override public void onEvent(CqEvent aCqEvent) { + while (!initialResultsLoaded) { + Thread.yield(); + } try { eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -52,4 +57,8 @@ class GeodeKafkaSourceListener implements CqStatusListener { public void onCqConnected() { } + + public void signalInitialResultsLoaded() { + initialResultsLoaded = true; + } } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java index 829eb29..657b274 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -1,15 +1,17 @@ package geode.kafka.source; -import geode.kafka.GeodeContext; import geode.kafka.GeodeConnectorConfig; +import geode.kafka.GeodeContext; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqAttributesFactory; +import org.apache.geode.cache.query.CqEvent; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,6 +21,7 @@ import java.util.stream.Collectors; import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; import static geode.kafka.GeodeConnectorConfig.REGION_NAME; @@ -33,6 +36,7 @@ public class GeodeKafkaSourceTask extends SourceTask { private static final Map<String, Long> OFFSET_DEFAULT = createOffset(); private GeodeContext geodeContext; + private GeodeConnectorConfig geodeConnectorConfig; private List<String> topics; private Map<String, Map<String, String>> sourcePartitions; private static BlockingQueue<GeodeEvent> eventBuffer; @@ -53,7 +57,7 @@ public class GeodeKafkaSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { try { - GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props); + geodeConnectorConfig = new GeodeConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); geodeContext = new GeodeContext(geodeConnectorConfig); @@ -65,8 +69,9 @@ public class GeodeKafkaSourceTask extends SourceTask { topics = geodeConnectorConfig.getTopics(); String cqPrefix = props.get(CQ_PREFIX); + boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION)); - installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix); + installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion); } catch (Exception e) { logger.error("Unable to start source task", e); @@ -95,23 +100,35 @@ public class GeodeKafkaSourceTask extends SourceTask { geodeContext.getClientCache().close(true); } - void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) { + void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) { boolean isDurable = geodeConnectorConfig.isDurable(); int taskId = geodeConnectorConfig.getTaskId(); for (String region : geodeConnectorConfig.getRegionNames()) { - installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable); + installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable); } if (isDurable) { geodeContext.getClientCache().readyForEvents(); } } - void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) { + void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) { CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); - cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName)); + GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName); + cqAttributesFactory.addCqListener(listener); CqAttributes cqAttributes = cqAttributesFactory.create(); - geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, - isDurable); + try { + if (loadEntireRegion) { + Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, + isDurable); + eventBuffer.addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList())); + } else { + geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, + isDurable); + } + } + finally { + listener.signalInitialResultsLoaded(); + } } /** diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java index 54f9e52..8f06f61 100644 --- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java +++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java @@ -2,7 +2,9 @@ package geode.kafka; import org.junit.Test; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.is; @@ -41,4 +43,16 @@ public class GeodeConnectorConfigTest { List<LocatorHostPort> locators = config.parseLocators(locatorString); assertThat(2, is(locators.size())); } + + @Test + public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() { + Map<String, String> props = new HashMap<>(); + GeodeConnectorConfig config = new GeodeConnectorConfig(props); + assertEquals("", config.getDurableClientId()); + } + + @Test + public void cqPrefixShouldBeProperlyCalculatedFromProps() { + + } } diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java index de78345..8f86cbc 100644 --- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -1,25 +1,70 @@ package geode.kafka.source; import geode.kafka.GeodeConnectorConfig; +import geode.kafka.GeodeContext; +import org.apache.geode.cache.query.CqEvent; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; import static geode.kafka.GeodeConnectorConfig.REGION_NAME; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class GeodeKafkaSourceTaskTest { + @Test - public void cqListenerOnEventPopulatesEventsBuffer() { + public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() { + GeodeContext geodeContext = mock(GeodeContext.class); + BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); + boolean loadEntireRegion = true; + boolean isDurable = false; + List<CqEvent> fakeInitialResults = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + fakeInitialResults.add(mock(CqEvent.class)); + } + + when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults); + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable); + assertEquals(10, eventBuffer.size()); + } + @Test + public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() { + GeodeContext geodeContext = mock(GeodeContext.class); + BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); + boolean loadEntireRegion = false; + boolean isDurable = false; + List<CqEvent> fakeInitialResults = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + fakeInitialResults.add(mock(CqEvent.class)); + } + + when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults); + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable); + assertEquals(0, eventBuffer.size()); } @Test + public void cqListenerOnEventPopulatesEventsBuffer() {} + + @Test public void pollReturnsEventsWhenEventBufferHasValues() { } diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java index 717d495..5ffd363 100644 --- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java @@ -1,5 +1,6 @@ package geode.kafka.source; +import geode.kafka.GeodeConnectorConfig; import org.junit.Test; import java.util.HashMap; @@ -7,16 +8,6 @@ import java.util.Map; public class GeodeKafkaSourceTest { - @Test - public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() { - GeodeKafkaSource source = new GeodeKafkaSource(); - Map<String, String> props = new HashMap<>(); - source.start(props); - } - @Test - public void cqPrefixShouldBeProperlyCalculatedFromProps() { - - } }
