GEODE-11: Add search function ResultCollector https://reviews.apache.org/r/38320/
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/026271b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/026271b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/026271b0 Branch: refs/heads/feature/GEODE-11 Commit: 026271b0ca9fac716285cdcd085a22b1f18c4e9f Parents: 01c4bc9 Author: Ashvin Agrawal <[email protected]> Authored: Fri Sep 11 11:24:05 2015 -0700 Committer: Ashvin Agrawal <[email protected]> Committed: Fri Sep 11 14:55:18 2015 -0700 ---------------------------------------------------------------------- .../distributed/TopEntriesCollectorManager.java | 2 +- .../TopEntriesFunctionCollector.java | 132 +++++++++ .../TopEntriesFunctionCollectorJUnitTest.java | 270 +++++++++++++++++++ 3 files changed, 403 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/026271b0/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java index 21e11ab..f269b2b 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java @@ -33,7 +33,7 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCo } public TopEntriesCollectorManager(String id, int resultLimit) { - this.limit = resultLimit < 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit; + this.limit = resultLimit <= 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit; this.id = id == null ? String.valueOf(this.hashCode()) : id; logger.debug("Max count of entries to be produced by {} is {}", id, limit); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/026271b0/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java new file mode 100644 index 0000000..3b39538 --- /dev/null +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java @@ -0,0 +1,132 @@ +package com.gemstone.gemfire.cache.lucene.internal.distributed; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.cache.execute.ResultCollector; +import com.gemstone.gemfire.cache.lucene.LuceneQuery; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * A {@link ResultCollector} implementation for collecting and ordering {@link TopEntries}. The {@link TopEntries} + * objects will be created by members when a {@link LuceneQuery} is executed on the local data hosted by the member. The + * member executing this logic must have sufficient space to hold all the {@link EntryScore} documents returned from the + * members. + * + * <p> + * This class will perform a lazy merge operation. Merge will take place if the merge {@link ResultCollector#getResult} + * is invoked or if the combined result size is more than the limit set. In the later case, merge will be performed + * whenever {@link ResultCollector#addResult} is invoked. + */ +public class TopEntriesFunctionCollector implements ResultCollector<TopEntriesCollector, TopEntries> { + // Use this instance to perform reduce operation + final CollectorManager<TopEntriesCollector> manager; + + // latch to wait till all results are collected + private final CountDownLatch waitForResults = new CountDownLatch(1); + + final String id; + + // Instance of gemfire cache to check status and other utility methods + final private GemFireCacheImpl cache; + private static final Logger logger = LogService.getLogger(); + + private final Collection<TopEntriesCollector> subResults = new ArrayList<>(); + + public TopEntriesFunctionCollector() { + this(null, null); + } + + public TopEntriesFunctionCollector(CollectorManager<TopEntriesCollector> manager) { + this(manager, null); + } + + public TopEntriesFunctionCollector(CollectorManager<TopEntriesCollector> manager, GemFireCacheImpl cache) { + this.cache = cache; + id = cache == null ? String.valueOf(this.hashCode()) : cache.getName(); + this.manager = manager == null ? new TopEntriesCollectorManager(id) : manager; + } + + @Override + public TopEntries getResult() throws FunctionException { + try { + waitForResults.await(); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for result collection", e); + Thread.currentThread().interrupt(); + if (cache != null) { + cache.getCancelCriterion().checkCancelInProgress(e); + } + throw new FunctionException(e); + } + + return aggregateResults(); + } + + @Override + public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException { + try { + boolean result = waitForResults.await(timeout, unit); + if (!result) { + throw new FunctionException("Did not receive results from all members within wait time"); + } + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for result collection", e); + Thread.currentThread().interrupt(); + if (cache != null) { + cache.getCancelCriterion().checkCancelInProgress(e); + } + throw new FunctionException(e); + } + + return aggregateResults(); + } + + private TopEntries aggregateResults() { + synchronized (subResults) { + try { + TopEntriesCollector finalResult = manager.reduce(subResults); + return finalResult.getEntries(); + } catch (IOException e) { + logger.debug("Error while merging function execution results", e); + throw new FunctionException(e); + } + } + } + + @Override + public void endResults() { + synchronized (subResults) { + waitForResults.countDown(); + } + } + + @Override + public void clearResults() { + synchronized (subResults) { + if (waitForResults.getCount() == 0) { + throw new IllegalStateException("This collector is closed and cannot accept anymore results"); + } + + subResults.clear(); + } + } + + @Override + public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution) { + synchronized (subResults) { + if (waitForResults.getCount() == 0) { + throw new IllegalStateException("This collector is closed and cannot accept anymore results"); + } + subResults.add(resultOfSingleExecution); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/026271b0/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java new file mode 100644 index 0000000..1c42912 --- /dev/null +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java @@ -0,0 +1,270 @@ +package com.gemstone.gemfire.cache.lucene.internal.distributed; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; + +import com.gemstone.gemfire.CancelCriterion; +import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class TopEntriesFunctionCollectorJUnitTest { + EntryScore r1_1, r1_2, r2_1, r2_2; + TopEntriesCollector result1, result2; + + @Before + public void initializeCommonObjects() { + r1_1 = new EntryScore("3", .9f); + r1_2 = new EntryScore("1", .8f); + r2_1 = new EntryScore("2", 0.85f); + r2_2 = new EntryScore("4", 0.1f); + + result1 = new TopEntriesCollector(null); + result1.collect(r1_1); + result1.collect(r1_2); + + result2 = new TopEntriesCollector(null); + result2.collect(r2_1); + result2.collect(r2_2); + } + + @Test + public void testGetResultsBlocksTillEnd() throws Exception { + final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(); + final CountDownLatch insideThread = new CountDownLatch(1); + final CountDownLatch resultReceived = new CountDownLatch(1); + Thread resultClient = new Thread(new Runnable() { + @Override + public void run() { + insideThread.countDown(); + collector.getResult(); + resultReceived.countDown(); + } + }); + resultClient.start(); + + insideThread.await(1, TimeUnit.SECONDS); + assertEquals(0, insideThread.getCount()); + assertEquals(1, resultReceived.getCount()); + + collector.endResults(); + resultReceived.await(1, TimeUnit.SECONDS); + assertEquals(0, resultReceived.getCount()); + } + + @Test + public void testGetResultsTimedWait() throws Exception { + final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(); + collector.addResult(null, result1); + collector.addResult(null, result2); + + final CountDownLatch insideThread = new CountDownLatch(1); + final CountDownLatch resultReceived = new CountDownLatch(1); + + final AtomicReference<TopEntries> result = new AtomicReference<>(); + + Thread resultClient = new Thread(new Runnable() { + @Override + public void run() { + insideThread.countDown(); + result.set(collector.getResult(1, TimeUnit.SECONDS)); + resultReceived.countDown(); + } + }); + resultClient.start(); + + insideThread.await(1, TimeUnit.SECONDS); + assertEquals(0, insideThread.getCount()); + assertEquals(1, resultReceived.getCount()); + + collector.endResults(); + + resultReceived.await(1, TimeUnit.SECONDS); + assertEquals(0, resultReceived.getCount()); + + TopEntries merged = result.get(); + assertEquals(4, merged.size()); + TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2); + } + + @Test(expected = FunctionException.class) + public void testGetResultsWaitInterrupted() throws Exception { + interruptWhileWaiting(false); + } + + @Test(expected = FunctionException.class) + public void testGetResultsTimedWaitInterrupted() throws Exception { + interruptWhileWaiting(false); + } + + private void interruptWhileWaiting(final boolean timedWait) throws InterruptedException, Exception { + GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class); + final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null, mockCache); + + final CountDownLatch insideThread = new CountDownLatch(1); + final CountDownLatch endGetResult = new CountDownLatch(1); + final AtomicReference<Exception> exception = new AtomicReference<>(); + + Thread resultClient = new Thread(new Runnable() { + @Override + public void run() { + insideThread.countDown(); + try { + if (timedWait) { + collector.getResult(1, TimeUnit.SECONDS); + } else { + collector.getResult(); + } + } catch (FunctionException e) { + endGetResult.countDown(); + exception.set(e); + } + } + }); + resultClient.start(); + + insideThread.await(1, TimeUnit.SECONDS); + assertEquals(0, insideThread.getCount()); + assertEquals(1, endGetResult.getCount()); + + CancelCriterion mockCriterion = mock(CancelCriterion.class); + when(mockCache.getCancelCriterion()).thenReturn(mockCriterion); + resultClient.interrupt(); + endGetResult.await(1, TimeUnit.SECONDS); + assertEquals(0, endGetResult.getCount()); + throw exception.get(); + } + + @Test(expected = FunctionException.class) + public void expectErrorAfterWaitTime() throws Exception { + final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null); + + final CountDownLatch insideThread = new CountDownLatch(1); + final CountDownLatch endGetResult = new CountDownLatch(1); + final AtomicReference<Exception> exception = new AtomicReference<>(); + + Thread resultClient = new Thread(new Runnable() { + @Override + public void run() { + insideThread.countDown(); + try { + collector.getResult(10, TimeUnit.MILLISECONDS); + } catch (FunctionException e) { + endGetResult.countDown(); + exception.set(e); + } + } + }); + resultClient.start(); + + insideThread.await(1, TimeUnit.SECONDS); + assertEquals(0, insideThread.getCount()); + assertEquals(1, endGetResult.getCount()); + + endGetResult.await(1, TimeUnit.SECONDS); + assertEquals(0, endGetResult.getCount()); + throw exception.get(); + } + + @Test + public void mergeResultsDefaultCollectorManager() throws Exception { + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null); + collector.addResult(null, result1); + collector.addResult(null, result2); + collector.endResults(); + + TopEntries merged = collector.getResult(); + Assert.assertNotNull(merged); + assertEquals(4, merged.size()); + TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2); + } + + @Test + public void mergeResultsCustomCollectorManager() throws Exception { + TopEntries resultEntries = new TopEntries(); + TopEntriesCollector mockCollector = mock(TopEntriesCollector.class); + Mockito.doReturn(resultEntries).when(mockCollector).getEntries(); + + CollectorManager<TopEntriesCollector> mockManager = mock(CollectorManager.class); + Mockito.doReturn(mockCollector).when(mockManager) + .reduce(Mockito.argThat(new ArgumentMatcher<Collection<TopEntriesCollector>>() { + @Override + public boolean matches(Object argument) { + Collection<TopEntriesCollector> collectors = (Collection<TopEntriesCollector>) argument; + return collectors.contains(result1) && collectors.contains(result2); + } + })); + + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(mockManager); + collector.addResult(null, result1); + collector.addResult(null, result2); + collector.endResults(); + + TopEntries merged = collector.getResult(); + assertEquals(resultEntries, merged); + } + + @Test + public void mergeAfterClearResults() throws Exception { + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null); + collector.addResult(null, result1); + collector.clearResults(); + collector.addResult(null, result2); + collector.endResults(); + + TopEntries merged = collector.getResult(); + Assert.assertNotNull(merged); + assertEquals(2, merged.size()); + TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r2_1, r2_2); + } + + @Test(expected = FunctionException.class) + public void testExceptionDuringMerge() throws Exception { + TopEntriesCollectorManager mockManager = mock(TopEntriesCollectorManager.class); + Mockito.doThrow(new IOException()).when(mockManager).reduce(any(Collection.class)); + + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(mockManager); + collector.endResults(); + collector.getResult(); + } + + @Test(expected = IllegalStateException.class) + public void addResultDisallowedAfterEndResult() { + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(); + collector.endResults(); + collector.addResult(null, new TopEntriesCollector(null)); + } + + @Test(expected = IllegalStateException.class) + public void clearResultDisallowedAfterEndResult() { + TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(); + collector.endResults(); + collector.clearResults(); + } + + @Test + public void testCollectorName() { + GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class); + Mockito.doReturn("server").when(mockCache).getName(); + + TopEntriesFunctionCollector function = new TopEntriesFunctionCollector(null, mockCache); + assertEquals("server", function.id); + } + +}
