This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4ff1f22 Configurable shared event buffer
4ff1f22 is described below
commit 4ff1f22dd88dbe0487494a4b7d0dedbf4a205f9a
Author: Jason Huynh <[email protected]>
AuthorDate: Tue Feb 4 14:31:51 2020 -0800
Configurable shared event buffer
---
.../geode/kafka/source/EventBufferSupplier.java | 7 +++
.../kafka/source/GeodeKafkaSourceListener.java | 11 +++--
.../geode/kafka/source/GeodeKafkaSourceTask.java | 21 ++++-----
.../kafka/source/SharedEventBufferSupplier.java | 37 ++++++++++++++++
.../kafka/source/GeodeKafkaSourceTaskTest.java | 17 ++++++--
.../source/SharedEventBufferSupplierTest.java | 51 ++++++++++++++++++++++
6 files changed, 122 insertions(+), 22 deletions(-)
diff --git a/src/main/java/geode/kafka/source/EventBufferSupplier.java
b/src/main/java/geode/kafka/source/EventBufferSupplier.java
new file mode 100644
index 0000000..d844744
--- /dev/null
+++ b/src/main/java/geode/kafka/source/EventBufferSupplier.java
@@ -0,0 +1,7 @@
+package geode.kafka.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
+
+public interface EventBufferSupplier extends
Supplier<BlockingQueue<GeodeEvent>> {
+}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index f317965..5c8a152 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -19,7 +19,6 @@ import org.apache.geode.cache.query.CqStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
class GeodeKafkaSourceListener implements CqStatusListener {
@@ -27,12 +26,12 @@ class GeodeKafkaSourceListener implements CqStatusListener {
private static final Logger logger =
LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
public String regionName;
- private BlockingQueue<GeodeEvent> eventBuffer;
+ private EventBufferSupplier eventBufferSupplier;
private boolean initialResultsLoaded;
- public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer,
String regionName) {
- this.eventBuffer = eventBuffer;
+ public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier,
String regionName) {
this.regionName = regionName;
+ this.eventBufferSupplier = eventBufferSupplier;
initialResultsLoaded = false;
}
@@ -42,12 +41,12 @@ class GeodeKafkaSourceListener implements CqStatusListener {
Thread.yield();
}
try {
- eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2,
TimeUnit.SECONDS);
+ eventBufferSupplier.get().offer(new GeodeEvent(regionName,
aCqEvent), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
while (true) {
try {
- if (!eventBuffer.offer(new GeodeEvent(regionName,
aCqEvent), 2, TimeUnit.SECONDS))
+ if (!eventBufferSupplier.get().offer(new
GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index c6cf6cb..6efeb85 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
@@ -48,11 +47,9 @@ public class GeodeKafkaSourceTask extends SourceTask {
private GeodeContext geodeContext;
private GeodeSourceConnectorConfig geodeConnectorConfig;
- private int taskId;
+ private EventBufferSupplier eventBufferSupplier;
private Map<String, List<String>> regionToTopics;
- private Collection<String> cqsToRegister;
private Map<String, Map<String, String>> sourcePartitions;
- private static BlockingQueue<GeodeEvent> eventBuffer = new
LinkedBlockingQueue<>(100000);
private int batchSize;
@@ -71,21 +68,21 @@ public class GeodeKafkaSourceTask extends SourceTask {
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
- taskId = geodeConnectorConfig.getTaskId();
+ int taskId = geodeConnectorConfig.getTaskId();
logger.debug("GeodeKafkaSourceTask id:" +
geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit());
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+ eventBufferSupplier = new
SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
regionToTopics = geodeConnectorConfig.getRegionToTopics();
- cqsToRegister = geodeConnectorConfig.getCqsToRegister();
+ geodeConnectorConfig.getCqsToRegister();
sourcePartitions =
createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = geodeConnectorConfig.getCqPrefix();
-
boolean loadEntireRegion =
geodeConnectorConfig.getLoadEntireRegion();
- installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer,
cqPrefix, loadEntireRegion);
+ installOnGeode(geodeConnectorConfig, geodeContext,
eventBufferSupplier, cqPrefix, loadEntireRegion);
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to start source task", e);
@@ -97,7 +94,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
- if (eventBuffer.drainTo(events, batchSize) > 0) {
+ if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);
@@ -116,7 +113,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
geodeContext.getClientCache().close(true);
}
- void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig,
GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean
loadEntireRegion) {
+ void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig,
GeodeContext geodeContext, EventBufferSupplier eventBuffer, String cqPrefix,
boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -127,7 +124,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
}
}
- GeodeKafkaSourceListener installListenersToRegion(GeodeContext
geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String
regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
+ GeodeKafkaSourceListener installListenersToRegion(GeodeContext
geodeContext, int taskId, EventBufferSupplier eventBuffer, String regionName,
String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new
GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
@@ -136,7 +133,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
if (loadEntireRegion) {
Collection<CqEvent> events =
geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix,
regionName), "select * from /" + regionName, cqAttributes,
isDurable);
- eventBuffer.addAll(events.stream().map(e -> new
GeodeEvent(regionName, e)).collect(Collectors.toList()));
+ eventBuffer.get().addAll(events.stream().map(e -> new
GeodeEvent(regionName, e)).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix,
regionName), "select * from /" + regionName, cqAttributes,
isDurable);
diff --git a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
new file mode 100644
index 0000000..2a4c883
--- /dev/null
+++ b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
@@ -0,0 +1,37 @@
+package geode.kafka.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+public class SharedEventBufferSupplier implements EventBufferSupplier {
+
+ private static BlockingQueue<GeodeEvent> eventBuffer;
+
+ public SharedEventBufferSupplier(int size) {
+ recreateEventBufferIfNeeded(size);
+ }
+
+ BlockingQueue recreateEventBufferIfNeeded(int size) {
+ if (eventBuffer == null || (eventBuffer.size() +
eventBuffer.remainingCapacity()) != size) {
+ synchronized (GeodeKafkaSource.class) {
+ if (eventBuffer == null || (eventBuffer.size() +
eventBuffer.remainingCapacity()) != size) {
+ BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
+ eventBuffer = new LinkedBlockingQueue<>(size);
+ if (oldEventBuffer != null) {
+ eventBuffer.addAll(oldEventBuffer);
+ }
+ }
+ }
+ }
+ return eventBuffer;
+ }
+
+ /**
+ * Callers should not store a reference to this and instead always call
get to make sure we always use the latest buffer
+ * Buffers themselves shouldn't change often but in cases where we want to
modify the size
+ */
+ public BlockingQueue<GeodeEvent> get() {
+ return eventBuffer;
+ }
+}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index ffcc3d8..33f1ab5 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -58,7 +58,7 @@ public class GeodeKafkaSourceTaskTest {
when(geodeContext.newCqWithInitialResults(anyString(), anyString(),
any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installListenersToRegion(geodeContext, 1, eventBuffer,
"testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ task.installListenersToRegion(geodeContext, 1,
createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX,
loadEntireRegion, isDurable);
assertEquals(10, eventBuffer.size());
}
@@ -75,7 +75,7 @@ public class GeodeKafkaSourceTaskTest {
when(geodeContext.newCqWithInitialResults(anyString(), anyString(),
any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installListenersToRegion(geodeContext, 1, eventBuffer,
"testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ task.installListenersToRegion(geodeContext, 1,
createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX,
loadEntireRegion, isDurable);
assertEquals(0, eventBuffer.size());
}
@@ -88,7 +88,7 @@ public class GeodeKafkaSourceTaskTest {
when(geodeContext.newCqWithInitialResults(anyString(), anyString(),
any(), anyBoolean())).thenReturn(new ArrayList());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- GeodeKafkaSourceListener listener =
task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion",
DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ GeodeKafkaSourceListener listener =
task.installListenersToRegion(geodeContext, 1,
createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX,
loadEntireRegion, isDurable);
listener.onEvent(mock(CqEvent.class));
assertEquals(1, eventBuffer.size());
@@ -140,7 +140,7 @@ public class GeodeKafkaSourceTaskTest {
when
(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(),
"someCqPrefix", true);
+ task.installOnGeode(config, geodeContext,
createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
verify(geodeContext, times(1)).newCqWithInitialResults(anyString(),
anyString(), any(), anyBoolean());
}
@@ -218,4 +218,13 @@ public class GeodeKafkaSourceTaskTest {
// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
}
+
+ private EventBufferSupplier
createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
+ return new EventBufferSupplier() {
+ @Override
+ public BlockingQueue<GeodeEvent> get() {
+ return eventBuffer;
+ }
+ };
+ }
}
diff --git
a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
new file mode 100644
index 0000000..b4a429b
--- /dev/null
+++ b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -0,0 +1,51 @@
+package geode.kafka.source;
+
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class SharedEventBufferSupplierTest {
+
+ @Test
+ public void creatingNewSharedEventSupplierShouldCreateInstance() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ assertNotNull(supplier.get());
+ }
+
+ @Test
+ public void
alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ BlockingQueue<GeodeEvent> queue = supplier.get();
+ supplier = new SharedEventBufferSupplier(1);
+ assertEquals(queue, supplier.get());
+ }
+
+ @Test
+ public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ SharedEventBufferSupplier newSupplier = new
SharedEventBufferSupplier(2);
+ assertEquals(supplier.get(), newSupplier.get());
+ }
+
+ @Test
+ public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ BlockingQueue<GeodeEvent> queue = supplier.get();
+ SharedEventBufferSupplier newSupplier = new
SharedEventBufferSupplier(2);
+ assertNotEquals(queue, newSupplier.get());
+ }
+
+ @Test
+ public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ GeodeEvent geodeEvent = mock(GeodeEvent.class);
+ supplier.get().add(geodeEvent);
+ SharedEventBufferSupplier newSupplier = new
SharedEventBufferSupplier(2);
+ assertEquals(geodeEvent, newSupplier.get().poll());
+ }
+}