GEODE-11: Keep results in collector unmodified Earlier TopEntriesCollectorManager.reduce method was modifying the hits collected in TopEntriesCollector. As a result calling getResults twice will fail. The collected entry set needs to be preserved without making a copy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/58ddc22e Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/58ddc22e Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/58ddc22e Branch: refs/heads/develop Commit: 58ddc22e6d321abf80e9e674c3abea2d469b847e Parents: 75abaf5 Author: Ashvin Agrawal <[email protected]> Authored: Fri Oct 2 11:37:52 2015 -0700 Committer: Ashvin Agrawal <[email protected]> Committed: Fri Oct 2 11:37:52 2015 -0700 ---------------------------------------------------------------------- .../distributed/TopEntriesCollectorManager.java | 49 ++++++++++---- .../TopEntriesFunctionCollector.java | 9 ++- .../TopEntriesCollectorJUnitTest.java | 71 +++++++++++++++----- .../TopEntriesFunctionCollectorJUnitTest.java | 18 +++++ 4 files changed, 117 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/58ddc22e/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java index 37631c6..417d80f 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java @@ -54,41 +54,42 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCo if (collectors.isEmpty()) { return mergedResult; } - + final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator(); // orders a entry with higher score above a doc with lower score - Comparator<List<EntryScore>> entryListComparator = new Comparator<List<EntryScore>>() { + Comparator<ListScanner> entryListComparator = new Comparator<ListScanner>() { @Override - public int compare(List<EntryScore> l1, List<EntryScore> l2) { - EntryScore o1 = l1.get(0); - EntryScore o2 = l2.get(0); + public int compare(ListScanner l1, ListScanner l2) { + EntryScore o1 = l1.peek(); + EntryScore o2 = l2.peek(); return scoreComparator.compare(o1, o2); } }; // The queue contains iterators for all bucket results. The queue puts the entry with the highest score at the head // using score comparator. - PriorityQueue<List<EntryScore>> entryListsPriorityQueue; - entryListsPriorityQueue = new PriorityQueue<List<EntryScore>>(collectors.size(), Collections.reverseOrder(entryListComparator)); + PriorityQueue<ListScanner> entryListsPriorityQueue; + entryListsPriorityQueue = new PriorityQueue<ListScanner>(collectors.size(), + Collections.reverseOrder(entryListComparator)); for (IndexResultCollector collector : collectors) { logger.debug("Number of entries found in collector {} is {}", collector.getName(), collector.size()); if (collector.size() > 0) { - entryListsPriorityQueue.add(((TopEntriesCollector) collector).getEntries().getHits()); + entryListsPriorityQueue.add(new ListScanner(((TopEntriesCollector) collector).getEntries().getHits())); } } logger.debug("Only {} count of entries will be reduced. Other entries will be ignored", limit); while (entryListsPriorityQueue.size() > 0 && limit > mergedResult.size()) { - List<EntryScore> list = entryListsPriorityQueue.remove(); - EntryScore entry = list.remove(0); + ListScanner scanner = entryListsPriorityQueue.remove(); + EntryScore entry = scanner.next(); mergedResult.collect(entry); - if (list.size() > 0) { - entryListsPriorityQueue.add(list); + if (scanner.hasNext()) { + entryListsPriorityQueue.add(scanner); } } @@ -96,6 +97,30 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCo return mergedResult; } + /* + * Utility class to iterate on hits without modifying it + */ + static class ListScanner { + private List<EntryScore> hits; + private int index = 0; + + ListScanner(List<EntryScore> hits) { + this.hits = hits; + } + + boolean hasNext() { + return index < hits.size(); + } + + EntryScore peek() { + return hits.get(index); + } + + EntryScore next() { + return hits.get(index++); + } + } + @Override public Version[] getSerializationVersions() { return null; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/58ddc22e/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java index 26586d9..032e136 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java @@ -40,6 +40,7 @@ public class TopEntriesFunctionCollector implements ResultCollector<TopEntriesCo private static final Logger logger = LogService.getLogger(); private final Collection<TopEntriesCollector> subResults = new ArrayList<>(); + private TopEntriesCollector mergedResults; public TopEntriesFunctionCollector() { this(null, null); @@ -99,9 +100,13 @@ public class TopEntriesFunctionCollector implements ResultCollector<TopEntriesCo private TopEntries aggregateResults() { synchronized (subResults) { + if (mergedResults != null) { + return mergedResults.getEntries(); + } + try { - TopEntriesCollector finalResult = manager.reduce(subResults); - return finalResult.getEntries(); + mergedResults = manager.reduce(subResults); + return mergedResults.getEntries(); } catch (IOException e) { logger.debug("Error while merging function execution results", e); throw new FunctionException(e); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/58ddc22e/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java index 50c9f92..8acdd5a 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java @@ -5,31 +5,38 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import com.gemstone.gemfire.CopyHelper; import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl; +import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollectorManager.ListScanner; import com.gemstone.gemfire.test.junit.categories.UnitTest; @Category(UnitTest.class) public class TopEntriesCollectorJUnitTest { + EntryScore r1_1 = new EntryScore("1-1", .9f); + EntryScore r1_2 = new EntryScore("1-2", .7f); + EntryScore r1_3 = new EntryScore("1-3", .5f); - @Test - public void testReduce() throws Exception { - EntryScore r1_1 = new EntryScore("1-1", .9f); - EntryScore r1_2 = new EntryScore("1-2", .7f); - EntryScore r1_3 = new EntryScore("1-3", .5f); + EntryScore r2_1 = new EntryScore("2-1", .85f); + EntryScore r2_2 = new EntryScore("2-2", .65f); - EntryScore r2_1 = new EntryScore("2-1", .85f); - EntryScore r2_2 = new EntryScore("2-2", .65f); + EntryScore r3_1 = new EntryScore("3-1", .8f); + EntryScore r3_2 = new EntryScore("3-2", .6f); + EntryScore r3_3 = new EntryScore("3-3", .4f); - EntryScore r3_1 = new EntryScore("3-1", .8f); - EntryScore r3_2 = new EntryScore("3-2", .6f); - EntryScore r3_3 = new EntryScore("3-3", .4f); + TopEntriesCollectorManager manager; - TopEntriesCollectorManager manager = new TopEntriesCollectorManager(); + @Before + public void setup() { + manager = new TopEntriesCollectorManager(); + } + @Test + public void testReduce() throws Exception { TopEntriesCollector c1 = manager.newCollector("c1"); c1.collect(r1_1.getKey(), r1_1.getScore()); c1.collect(r1_2.getKey(), r1_2.getScore()); @@ -49,11 +56,17 @@ public class TopEntriesCollectorJUnitTest { collectors.add(c2); collectors.add(c3); - TopEntriesCollector entries = manager.reduce(collectors); - assertEquals(8, entries.getEntries().getHits().size()); - TopEntriesJUnitTest.verifyResultOrder(entries.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3); + TopEntriesCollector hits = manager.reduce(collectors); + assertEquals(8, hits.getEntries().getHits().size()); + TopEntriesJUnitTest.verifyResultOrder(hits.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3); + + // input collector should not change + assertEquals(3, c1.getEntries().getHits().size()); + assertEquals(2, c2.getEntries().getHits().size()); + assertEquals(3, c3.getEntries().getHits().size()); + // TopEntriesJUnitTest.verifyResultOrder(c1.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3); } - + @Test public void testInitialization() { TopEntriesCollector collector = new TopEntriesCollector("name"); @@ -69,7 +82,7 @@ public class TopEntriesCollectorJUnitTest { assertEquals("id", copy.getId()); assertEquals(213, copy.getLimit()); } - + @Test public void testCollectorSerialization() { LuceneServiceImpl.registerDataSerializables(); @@ -78,4 +91,30 @@ public class TopEntriesCollectorJUnitTest { assertEquals("collector", copy.getName()); assertEquals(345, copy.getEntries().getLimit()); } + + @Test + public void testScannerDoesNotMutateHits() { + TopEntriesCollector c1 = manager.newCollector("c1"); + c1.collect(r1_1.getKey(), r1_1.getScore()); + c1.collect(r1_2.getKey(), r1_2.getScore()); + c1.collect(r1_3.getKey(), r1_3.getScore()); + + assertEquals(3, c1.getEntries().getHits().size()); + TopEntriesJUnitTest.verifyResultOrder(c1.getEntries().getHits(), r1_1, r1_2, r1_3); + + ListScanner scanner = new ListScanner(c1.getEntries().getHits()); + Assert.assertTrue(scanner.hasNext()); + assertEquals(r1_1.getKey(), scanner.peek().getKey()); + assertEquals(r1_1.getKey(), scanner.next().getKey()); + Assert.assertTrue(scanner.hasNext()); + assertEquals(r1_2.getKey(), scanner.peek().getKey()); + assertEquals(r1_2.getKey(), scanner.next().getKey()); + Assert.assertTrue(scanner.hasNext()); + assertEquals(r1_3.getKey(), scanner.peek().getKey()); + assertEquals(r1_3.getKey(), scanner.next().getKey()); + Assert.assertFalse(scanner.hasNext()); + + assertEquals(3, c1.getEntries().getHits().size()); + TopEntriesJUnitTest.verifyResultOrder(c1.getEntries().getHits(), r1_1, r1_2, r1_3); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/58ddc22e/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java index 5f7dc3d..f17200b 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java @@ -211,6 +211,24 @@ public class TopEntriesFunctionCollectorJUnitTest { } @Test + public void getResultsTwice() throws Exception { + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(); + collector.addResult(null, result1); + collector.addResult(null, result2); + collector.endResults(); + + TopEntries merged = collector.getResult(); + Assert.assertNotNull(merged); + assertEquals(4, merged.size()); + TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2); + + merged = collector.getResult(); + Assert.assertNotNull(merged); + assertEquals(4, merged.size()); + TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2); + } + + @Test public void mergeResultsCustomCollectorManager() throws Exception { TopEntries resultEntries = new TopEntries(); TopEntriesCollector mockCollector = mock(TopEntriesCollector.class);
