javanna commented on code in PR #16247: URL: https://github.com/apache/lucene/pull/16247#discussion_r3443454830
########## lucene/core/src/test/org/apache/lucene/search/TestCachingCollectorManager.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import org.apache.lucene.document.Document; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.tests.util.LuceneTestCase; + +public class TestCachingCollectorManager extends LuceneTestCase { + + public void testCacheOverflow() throws IOException { + Directory dir = newDirectory(); + RandomIndexWriter iw = new RandomIndexWriter(random(), dir); + for (int i = 0; i < atLeast(10); i++) { + iw.addDocument(new Document()); + } + IndexSearcher searcher = newSearcher(iw.getReader()); + iw.close(); + + CachingCollectorManager<TopScoreDocCollector, TopDocs> caching = + new CachingCollectorManager<>( + new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, null, 0); + + searcher.search(MatchAllDocsQuery.INSTANCE, caching); + assertFalse(caching.isCached()); + assertThrows( + IllegalStateException.class, + () -> caching.replay(new TopScoreDocCollectorManager(10, Integer.MAX_VALUE))); + + searcher.getIndexReader().close(); + dir.close(); + } + + public void testNotCachedBeforeSearch() { + CachingCollectorManager<TopScoreDocCollector, TopDocs> caching = + new CachingCollectorManager<>( + new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, null, Integer.MAX_VALUE); + assertFalse(caching.isCached()); + + assertThrows( + IllegalStateException.class, + () -> caching.replay(new TopScoreDocCollectorManager(10, Integer.MAX_VALUE))); + } + + public void testConstructorValidation() { + assertThrows( + IllegalArgumentException.class, + () -> + new CachingCollectorManager<>( + new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, null, null)); + } Review Comment: could we add also a test for the happy path ? ########## lucene/grouping/src/test/org/apache/lucene/search/grouping/TestGroupingSearch.java: ########## @@ -110,10 +110,17 @@ public void testBasic() throws Exception { // 6 -- no author field doc = new Document(); doc.add(new TextField("content", "random word stuck in alot of other text", Field.Store.YES)); - doc.add(new Field("id", "6", customType)); - doc.add(new StringField("groupend", "x", Field.Store.NO)); + doc.add(new Field("id", "7", customType)); + documents.add(doc); - w.addDocument(doc); + // 7 -- no match document + doc = new Document(); + doc.add(new TextField("content", "no-match", Field.Store.YES)); + doc.add(new Field("id", "8", customType)); + doc.add(new StringField("groupend", "x", Field.Store.NO)); + documents.add(doc); + w.addDocuments(documents); + documents.clear(); Review Comment: Sorry , I don't follow. there's a doc above that goes from id 6 to id 7 and a new doc added with id 8 (the comment seems wrong -- /// 7 - no match document) ########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB, null); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> + implements CollectorManager<CachingCollector, R> { + + private final CollectorManager<C, R> delegate; + private final boolean cacheScores; + private final Double maxRAMMB; + private final Integer maxDocsToCache; + + // One CachingCollector per slice + private final List<CachingCollector> cachingCollectors = new ArrayList<>(); + + /** + * @param delegate the first-pass {@link CollectorManager} + * @param cacheScores whether to cache scores in addition to document IDs + * @param maxRAMMB the maximum RAM in MB to use per slice cache, or null if using maxDocsToCache + * @param maxDocsToCache the maximum number of documents to cache per slice, or null if using + * maxRAMMB + */ + public CachingCollectorManager( + CollectorManager<C, R> delegate, + boolean cacheScores, + Double maxRAMMB, + Integer maxDocsToCache) { + if (maxRAMMB == null && maxDocsToCache == null) { + throw new IllegalArgumentException("Either maxRAMMB or maxDocsToCache must be set"); + } + this.delegate = delegate; + this.cacheScores = cacheScores; + this.maxRAMMB = maxRAMMB; + this.maxDocsToCache = maxDocsToCache; + } + + @Override + public CachingCollector newCollector() throws IOException { + C collector = delegate.newCollector(); + CachingCollector cache = + maxDocsToCache != null + ? CachingCollector.create(collector, cacheScores, maxDocsToCache) + : CachingCollector.create(collector, cacheScores, maxRAMMB); + cachingCollectors.add(cache); + return cache; + } + + @Override + @SuppressWarnings("unchecked") + public R reduce(Collection<CachingCollector> collectors) throws IOException { + List<C> originals = new ArrayList<>(collectors.size()); + for (CachingCollector cache : collectors) { + originals.add((C) cache.in); + } + return delegate.reduce(originals); + } + + /** + * Returns {@code true} if the search has been run and all per-slice caches are intact (none + * overflowed their RAM/doc budget). Returns {@code false} if the search has not yet been run or + * any cache overflowed. + */ + public boolean isCached() { + return !cachingCollectors.isEmpty() + && cachingCollectors.stream().allMatch(CachingCollector::isCached); Review Comment: I hinted at t in a previous comment: I believe we should make the cached flag in the collector volatile, at the very least, to stay on the safe side and avoid visibility issues across threads. ########## lucene/grouping/src/test/org/apache/lucene/search/grouping/BaseGroupSelectorTestCase.java: ########## @@ -72,7 +73,6 @@ public void testSortByRelevance() throws IOException { TopDocs td = searcher.search(filtered, 10); assertScoreDocsEquals(topGroups.groups[i].scoreDocs(), td.scoreDocs); if (i == 0) { - assertEquals(td.scoreDocs[0].doc, topDoc.scoreDocs[0].doc); Review Comment: This is because there are docs with the same score returned, and previously we could rely on deterministic order in which docs are visited, while this is no longer true with concurrency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
