GEODE-11: Refactor CollectorManager interface CollectorManager creates new collectors and merges results of the collectors. Earlier the merge result type could be different from the collector type. CollectorManager could actually use a collector itself to merge the results. That way the actions on members and search coordinator will be the same.
https://reviews.apache.org/r/38320/ Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/01c4bc9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/01c4bc9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/01c4bc9f Branch: refs/heads/feature/GEODE-11 Commit: 01c4bc9f1ad72b0f18dac2cfc171b6dfff171aec Parents: 54bc45e Author: Ashvin Agrawal <[email protected]> Authored: Fri Sep 11 09:22:12 2015 -0700 Committer: Ashvin Agrawal <[email protected]> Committed: Fri Sep 11 14:55:10 2015 -0700 ---------------------------------------------------------------------- .../internal/distributed/CollectorManager.java | 13 +++++--- .../distributed/LuceneQueryFunction.java | 8 ++--- .../distributed/LuceneSearchFunctionArgs.java | 3 +- .../distributed/TopEntriesCollector.java | 6 +++- .../distributed/TopEntriesCollectorManager.java | 33 ++++++++++-------- .../LuceneQueryFunctionJUnitTest.java | 35 ++++++++++---------- .../TopEntriesCollectorJUnitTest.java | 9 +++-- 7 files changed, 59 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java index 8039d14..41c3f5f 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java @@ -3,15 +3,20 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed; import java.io.IOException; import java.util.Collection; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector; /** - * This class is used to aggregate search results from multiple search requests. + * {@link CollectorManager}s create instances of {@link IndexResultCollector} and utility methods to aggregate results + * collected by individual collectors. The collectors created by this class are primarily used for collecting results + * from {@link IndexRepository}s. The collectors can also be used for aggregating results of other collectors of same + * type. Typically search result aggregation is completed in two phases. First at a member level for merging results + * from local buckets. And then at search coordinator level for merging results from members. Use of same collector in + * both phases is expected to produce deterministic merge result irrespective of the way buckets are distributed. * - * @param <T> Type of reduce result * @param <C> Type of IndexResultCollector created by this manager */ -public interface CollectorManager<T, C extends IndexResultCollector> { +public interface CollectorManager<C extends IndexResultCollector> { /** * @param name Name/Identifier for this collector. For e.g. region/bucketId. * @return a new {@link IndexResultCollector}. This must return a different instance on @@ -25,5 +30,5 @@ public interface CollectorManager<T, C extends IndexResultCollector> { * * @throws IOException */ - T reduce(Collection<IndexResultCollector> results) throws IOException; + C reduce(Collection<C> results) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java index 369ceb8..6e1d217 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java @@ -34,7 +34,7 @@ public class LuceneQueryFunction extends FunctionAdapter { @Override public void execute(FunctionContext context) { RegionFunctionContext ctx = (RegionFunctionContext) context; - ResultSender<TopEntries> resultSender = ctx.getResultSender(); + ResultSender<TopEntriesCollector> resultSender = ctx.getResultSender(); Region region = ctx.getDataSet(); if (logger.isDebugEnabled()) { @@ -46,7 +46,7 @@ public class LuceneQueryFunction extends FunctionAdapter { CollectorManager manager = (args == null) ? null : args.getCollectorManager(); if (manager == null) { int resultLimit = (args == null ? LuceneQueryFactory.DEFAULT_LIMIT : args.getLimit()); - manager = new TopEntriesCollectorManager(resultLimit); + manager = new TopEntriesCollectorManager(null, resultLimit); } Collection<IndexResultCollector> results = new ArrayList<>(); @@ -68,9 +68,9 @@ public class LuceneQueryFunction extends FunctionAdapter { return; } - TopEntries mergedResult; + TopEntriesCollector mergedResult; try { - mergedResult = (TopEntries) manager.reduce(results); + mergedResult = (TopEntriesCollector) manager.reduce(results); resultSender.lastResult(mergedResult); } catch (IOException e) { logger.warn("", e); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java index 2a5a5b6..52f3ce9 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java @@ -3,7 +3,6 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Set; import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; @@ -48,7 +47,7 @@ public class LuceneSearchFunctionArgs implements VersionedDataSerializable { * * @return {@link CollectorManager} instance to be used by function */ - public <T, C extends IndexResultCollector> CollectorManager<T, C> getCollectorManager() { + public <C extends IndexResultCollector> CollectorManager<C> getCollectorManager() { return null; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java index 6b02391..a4b5144 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java @@ -23,7 +23,11 @@ public class TopEntriesCollector implements IndexResultCollector { @Override public void collect(Object key, float score) { - entries.addHit(new EntryScore(key, score)); + collect(new EntryScore(key, score)); + } + + public void collect(EntryScore entry) { + entries.addHit(entry); } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java index 7360c3c..21e11ab 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java @@ -18,18 +18,24 @@ import com.gemstone.gemfire.internal.logging.LogService; * An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to * collect top matching entries from local buckets */ -public class TopEntriesCollectorManager implements CollectorManager<TopEntries, TopEntriesCollector> { +public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCollector> { private static final Logger logger = LogService.getLogger(); final int limit; - + final String id; + public TopEntriesCollectorManager() { - this(LuceneQueryFactory.DEFAULT_LIMIT); + this(null, 0); } - - public TopEntriesCollectorManager(int resultLimit) { - this.limit = resultLimit; - logger.debug("Max count of entries to be returned: " + limit); + + public TopEntriesCollectorManager(String id) { + this(id, 0); + } + + public TopEntriesCollectorManager(String id, int resultLimit) { + this.limit = resultLimit < 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit; + this.id = id == null ? String.valueOf(this.hashCode()) : id; + logger.debug("Max count of entries to be produced by {} is {}", id, limit); } @Override @@ -38,7 +44,7 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries, } @Override - public TopEntries reduce(Collection<IndexResultCollector> collectors) throws IOException { + public TopEntriesCollector reduce(Collection<TopEntriesCollector> collectors) throws IOException { final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator(); // orders a entry with higher score above a doc with lower score @@ -55,15 +61,13 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries, // using score comparator. PriorityQueue<List<EntryScore>> entryListsPriorityQueue; entryListsPriorityQueue = new PriorityQueue<List<EntryScore>>(Collections.reverseOrder(entryListComparator)); - TopEntries mergedResult = new TopEntries(); + TopEntriesCollector mergedResult = new TopEntriesCollector(id, limit); for (IndexResultCollector collector : collectors) { - if (logger.isDebugEnabled()) { - logger.debug("Number of entries found in bucket {} is {}", collector.getName(), collector.size()); - } + logger.debug("Number of entries found in collector {} is {}", collector.getName(), collector.size()); if (collector.size() > 0) { - entryListsPriorityQueue.add(((TopEntriesCollector)collector).getEntries().getHits()); + entryListsPriorityQueue.add(((TopEntriesCollector) collector).getEntries().getHits()); } } @@ -72,13 +76,14 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries, List<EntryScore> list = entryListsPriorityQueue.remove(); EntryScore entry = list.remove(0); - mergedResult.addHit(entry); + mergedResult.collect(entry); if (list.size() > 0) { entryListsPriorityQueue.add(list); } } + logger.debug("Reduced size of {} is {}", mergedResult.name, mergedResult.size()); return mergedResult; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java index b7b3f1d..ee2847e 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java @@ -42,7 +42,7 @@ public class LuceneQueryFunctionJUnitTest { @Test public void testRepoQueryAndMerge() throws Exception { - final AtomicReference<TopEntries> result = new AtomicReference<>(); + final AtomicReference<TopEntriesCollector> result = new AtomicReference<>(); final QueryMocks m = new QueryMocks(); mocker.checking(new Expectations() { @@ -81,11 +81,11 @@ public class LuceneQueryFunctionJUnitTest { } }); - oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class))); + oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class))); will(new CustomAction("collectResult") { @Override public Object invoke(Invocation invocation) throws Throwable { - result.set((TopEntries) invocation.getParameter(0)); + result.set((TopEntriesCollector) invocation.getParameter(0)); return null; } }); @@ -96,14 +96,14 @@ public class LuceneQueryFunctionJUnitTest { function.setRepositoryManager(m.mockRepoManager); function.execute(m.mockContext); - List<EntryScore> hits = result.get().getHits(); + List<EntryScore> hits = result.get().getEntries().getHits(); assertEquals(5, hits.size()); - TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3); + TopEntriesJUnitTest.verifyResultOrder(result.get().getEntries().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3); } @Test public void testResultLimitClause() throws Exception { - final AtomicReference<TopEntries> result = new AtomicReference<>(); + final AtomicReference<TopEntriesCollector> result = new AtomicReference<>(); final QueryMocks m = new QueryMocks(); mocker.checking(new Expectations() { @@ -147,11 +147,11 @@ public class LuceneQueryFunctionJUnitTest { } }); - oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class))); + oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class))); will(new CustomAction("collectResult") { @Override public Object invoke(Invocation invocation) throws Throwable { - result.set((TopEntries) invocation.getParameter(0)); + result.set((TopEntriesCollector) invocation.getParameter(0)); return null; } }); @@ -162,9 +162,9 @@ public class LuceneQueryFunctionJUnitTest { function.setRepositoryManager(m.mockRepoManager); function.execute(m.mockContext); - List<EntryScore> hits = result.get().getHits(); + List<EntryScore> hits = result.get().getEntries().getHits(); assertEquals(3, hits.size()); - TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2); + TopEntriesJUnitTest.verifyResultOrder(result.get().getEntries().getHits(), r1_1, r2_1, r1_2); } @Test @@ -197,7 +197,7 @@ public class LuceneQueryFunctionJUnitTest { Collection<IndexResultCollector> collectors = (Collection<IndexResultCollector>) invocation.getParameter(0); assertEquals(1, collectors.size()); assertEquals(m.mockCollector, collectors.iterator().next()); - return new TopEntries(); + return new TopEntriesCollector(null); } }); @@ -213,7 +213,7 @@ public class LuceneQueryFunctionJUnitTest { } }); - oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class))); + oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class))); } }); @@ -294,7 +294,7 @@ public class LuceneQueryFunctionJUnitTest { will(returnValue(m.mockCollector)); oneOf(m.mockManager).reduce(with(any(Collection.class))); will(throwException(new IOException())); - + oneOf(m.mockRepoManager).getRepositories(m.mockRegion, m.mockContext); m.repos.remove(1); will(returnValue(m.repos)); @@ -303,13 +303,13 @@ public class LuceneQueryFunctionJUnitTest { oneOf(m.mockResultSender).sendException(with(any(IOException.class))); } }); - + LuceneQueryFunction function = new LuceneQueryFunction(); function.setRepositoryManager(m.mockRepoManager); - + function.execute(m.mockContext); } - + @Test public void testQueryFunctionId() { String id = new LuceneQueryFunction().getId(); @@ -318,7 +318,7 @@ public class LuceneQueryFunctionJUnitTest { class QueryMocks { RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class); - ResultSender<TopEntries> mockResultSender = mocker.mock(ResultSender.class); + ResultSender<TopEntriesCollector> mockResultSender = mocker.mock(ResultSender.class); Region<Object, Object> mockRegion = mocker.mock(Region.class); RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class); @@ -329,7 +329,6 @@ public class LuceneQueryFunctionJUnitTest { CollectorManager mockManager = mocker.mock(CollectorManager.class); IndexResultCollector mockCollector = mocker.mock(IndexResultCollector.class); - QueryMocks() { repos.add(mockRepository1); repos.add(mockRepository2); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java index 4766220..3cf622c 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java @@ -8,7 +8,6 @@ import java.util.List; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector; import com.gemstone.gemfire.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -43,14 +42,14 @@ public class TopEntriesCollectorJUnitTest { c3.collect(r3_2.key, r3_2.score); c3.collect(r3_3.key, r3_3.score); - List<IndexResultCollector> collectors = new ArrayList<>(); + List<TopEntriesCollector> collectors = new ArrayList<>(); collectors.add(c1); collectors.add(c2); collectors.add(c3); - TopEntries entries = manager.reduce(collectors); - assertEquals(8, entries.getHits().size()); - TopEntriesJUnitTest.verifyResultOrder(entries.getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3); + TopEntriesCollector entries = manager.reduce(collectors); + assertEquals(8, entries.getEntries().getHits().size()); + TopEntriesJUnitTest.verifyResultOrder(entries.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3); } @Test
