Updated locking to ensure multiple threads can offer to the queue but only one can swap the reference
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f0085694 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f0085694 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f0085694 Branch: refs/heads/master Commit: f00856946e6cb2ee83d05e96af29730d0f88dfe9 Parents: b5fd7e7 Author: mfranklin <[email protected]> Authored: Tue Apr 22 13:08:58 2014 -0400 Committer: mfranklin <[email protected]> Committed: Tue Apr 22 13:08:58 2014 -0400 ---------------------------------------------------------------------- .../ElasticsearchPersistReader.java | 20 +++++++++++++++++--- .../elasticsearch/ElasticsearchQuery.java | 2 -- 2 files changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0085694/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index fd2a155..54d29d2 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -15,6 +15,8 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * *********************************************************************************************************** @@ -35,6 +37,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali private ElasticsearchReaderConfiguration config; private int threadPoolSize = 10; private ExecutorService executor; + private ReadWriteLock lock = new ReentrantReadWriteLock(); public ElasticsearchPersistReader() { } @@ -68,13 +71,16 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali StreamsResultSet current; - synchronized (ElasticsearchPersistReader.class) { + try { + lock.writeLock().lock(); current = new StreamsResultSet(persistQueue); current.setCounter(new DatumStatusCounter()); // current.getCounter().add(countersCurrent); // countersTotal.add(countersCurrent); // countersCurrent = new DatumStatusCounter(); persistQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); } return current; @@ -99,11 +105,19 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali LOGGER.info("PersistReader done"); } + //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue + //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference + //if the queue is being replaced with a new instance protected void write(StreamsDatum entry) { boolean success; do { - success = persistQueue.offer(entry); - Thread.yield(); + try { + lock.readLock().lock(); + success = persistQueue.offer(entry); + Thread.yield(); + }finally { + lock.readLock().unlock(); + } } while (!success); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0085694/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index 8c9abda..2430c41 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -20,8 +20,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchHit>, Serializable {
