javanna commented on code in PR #16247: URL: https://github.com/apache/lucene/pull/16247#discussion_r3412025202
########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { + + private final CollectorManager<C, R> delegate; + private final boolean cacheScores; + private final Double maxRAMMB; + private final Integer maxDocsToCache; + + // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. + private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); + // The original unwrapped collectors + private final List<C> originalCollectors = new CopyOnWriteArrayList<>(); Review Comment: this does not need to handle concurrency, as newCollector is called from a single thread, the same that will call reduce? I wonder if this list is necessary too, given that reduce could loop through the collectors received as argument and retrieve the `in` collector, which though requires casting. ########## lucene/grouping/src/test/org/apache/lucene/search/grouping/TestGroupingSearch.java: ########## @@ -110,10 +110,17 @@ public void testBasic() throws Exception { // 6 -- no author field doc = new Document(); doc.add(new TextField("content", "random word stuck in alot of other text", Field.Store.YES)); - doc.add(new Field("id", "6", customType)); - doc.add(new StringField("groupend", "x", Field.Store.NO)); + doc.add(new Field("id", "7", customType)); + documents.add(doc); - w.addDocument(doc); + // 7 -- no match document + doc = new Document(); + doc.add(new TextField("content", "no-match", Field.Store.YES)); + doc.add(new Field("id", "8", customType)); + doc.add(new StringField("groupend", "x", Field.Store.NO)); + documents.add(doc); + w.addDocuments(documents); + documents.clear(); Review Comment: could you expand on why these changes were needed? ########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { + + private final CollectorManager<C, R> delegate; + private final boolean cacheScores; + private final Double maxRAMMB; + private final Integer maxDocsToCache; + + // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. + private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); + // The original unwrapped collectors + private final List<C> originalCollectors = new CopyOnWriteArrayList<>(); + + /** + * @param delegate the first-pass {@link CollectorManager} + * @param cacheScores whether to cache scores in addition to document IDs + * @param maxRAMMB the maximum RAM in MB to use per slice cache, or null if using maxDocsToCache + * @param maxDocsToCache the maximum number of documents to cache per slice, or null if using + * maxRAMMB + */ + public CachingCollectorManager( + CollectorManager<C, R> delegate, + boolean cacheScores, + Double maxRAMMB, + Integer maxDocsToCache) { + if (maxRAMMB == null && maxDocsToCache == null) { + throw new IllegalArgumentException("Either maxRAMMB or maxDocsToCache must be set"); + } + this.delegate = delegate; + this.cacheScores = cacheScores; + this.maxRAMMB = maxRAMMB; + this.maxDocsToCache = maxDocsToCache; + } + + @Override + public C newCollector() throws IOException { + C collector = delegate.newCollector(); + originalCollectors.add(collector); + CachingCollector cache = + maxDocsToCache != null + ? CachingCollector.create(collector, cacheScores, maxDocsToCache) + : CachingCollector.create(collector, cacheScores, maxRAMMB); + cachingCollectors.add(cache); + @SuppressWarnings("unchecked") + C wrapped = (C) cache; Review Comment: This unchecked cast is a bit of a red flag to me: I think that this CollectorManager should be typed differently: `CachingCollectorManager<C extends C, R> implements CollectorManager<CachingCollector, R>`. This way there is no cast needed in `newCollector`? That is also more inline to the actual implementation. An alternative would be to return cache.in, but that would also require casting. ########## lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java: ########## @@ -69,17 +73,29 @@ public class GroupingSearch { * @param groupField The name of the field to group by. */ public GroupingSearch(String groupField) { - this(new TermGroupSelector(groupField), null); + this(() -> new TermGroupSelector(groupField), null); + } + + /** + * Constructs a <code>GroupingSearch</code> instance that groups documents using a {@link + * GroupSelector} factory. + * + * @param grouperFactory a factory that creates fresh {@link GroupSelector} instances + */ + public GroupingSearch(Supplier<GroupSelector<?>> grouperFactory) { + this(grouperFactory, null); } /** * Constructs a <code>GroupingSearch</code> instance that groups documents using a {@link * GroupSelector} * * @param groupSelector a {@link GroupSelector} that defines groups for this GroupingSearch + * @deprecated Use {@link #GroupingSearch(Supplier)} to provide a factory for concurrent search */ + @Deprecated public GroupingSearch(GroupSelector<?> groupSelector) { - this(groupSelector, null); + this(() -> groupSelector, null); Review Comment: This is unsage, because the group selector will be shared across threads? Given this class is experimental, I would remove this constructor. I don't think we can leave it behind deprecated. ########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { + + private final CollectorManager<C, R> delegate; + private final boolean cacheScores; + private final Double maxRAMMB; + private final Integer maxDocsToCache; + + // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. + private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); + // The original unwrapped collectors + private final List<C> originalCollectors = new CopyOnWriteArrayList<>(); + + /** + * @param delegate the first-pass {@link CollectorManager} + * @param cacheScores whether to cache scores in addition to document IDs + * @param maxRAMMB the maximum RAM in MB to use per slice cache, or null if using maxDocsToCache + * @param maxDocsToCache the maximum number of documents to cache per slice, or null if using + * maxRAMMB + */ + public CachingCollectorManager( + CollectorManager<C, R> delegate, + boolean cacheScores, + Double maxRAMMB, + Integer maxDocsToCache) { + if (maxRAMMB == null && maxDocsToCache == null) { + throw new IllegalArgumentException("Either maxRAMMB or maxDocsToCache must be set"); + } + this.delegate = delegate; + this.cacheScores = cacheScores; + this.maxRAMMB = maxRAMMB; + this.maxDocsToCache = maxDocsToCache; + } + + @Override + public C newCollector() throws IOException { + C collector = delegate.newCollector(); + originalCollectors.add(collector); + CachingCollector cache = + maxDocsToCache != null + ? CachingCollector.create(collector, cacheScores, maxDocsToCache) + : CachingCollector.create(collector, cacheScores, maxRAMMB); + cachingCollectors.add(cache); + @SuppressWarnings("unchecked") + C wrapped = (C) cache; + return wrapped; + } + + @Override + public R reduce(Collection<C> collectors) throws IOException { + return delegate.reduce(originalCollectors); + } + + /** + * Returns {@code true} if all per-slice caches are intact (none overflowed their RAM budget), + * meaning {@link #replay} can be called. + */ + public boolean isCached() { + return !cachingCollectors.isEmpty() + && cachingCollectors.stream().allMatch(CachingCollector::isCached); + } + + /** + * Replays each per-slice cache into a fresh second-pass collector, then reduces all results. + * + * @throws IllegalStateException if any slice cache is not available + */ + public <C2 extends Collector, R2> R2 replay(CollectorManager<C2, R2> secondPassManager) + throws IOException { + if (!isCached()) { + throw new IllegalStateException("cache is not available; re-run the query instead"); + } + List<C2> secondCollectors = new ArrayList<>(cachingCollectors.size()); Review Comment: I would add a check that throws illegal state also when the list is empty, that means newCollector has never been called? ########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { + + private final CollectorManager<C, R> delegate; + private final boolean cacheScores; + private final Double maxRAMMB; + private final Integer maxDocsToCache; + + // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. + private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); Review Comment: looks like this list is needed for the replay functionality. Note that like we discussed in other PRs, newCollector is never called concurrently. It is called sequentially by the coordinating thread which will also call reduce at the end. What I do worry about when it comes to concurrency though is the isCached mutable flag, that gets modified by the worker threads and accessed by the main thread at the end. There isn't a concurrent access problem with it, but there may be visibility issues. The list does not need to handle concurrency though. ########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { Review Comment: Could we add a new test class that verifies the basic functionality of this collector manager, outside of its employment in grouping search? ########## lucene/core/src/java/org/apache/lucene/search/CachingCollectorManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all + * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass + * {@link CollectorManager} without re-running the query. + * + * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice + * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced + * together. This works correctly with both sequential and concurrent search. + * + * <p>Example usage: + * + * <pre class="prettyprint"> + * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( + * firstPassManager, cacheScores, maxRAMMB); + * R1 firstResult = searcher.search(query, caching); + * + * if (caching.isCached()) { + * R2 secondResult = caching.replay(secondPassManager); + * } else { + * // cache overflowed — re-run the query + * R2 secondResult = searcher.search(query, secondPassManager); + * } + * </pre> + * + * @lucene.experimental + */ +public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { + + private final CollectorManager<C, R> delegate; + private final boolean cacheScores; + private final Double maxRAMMB; + private final Integer maxDocsToCache; + + // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. + private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); + // The original unwrapped collectors + private final List<C> originalCollectors = new CopyOnWriteArrayList<>(); + + /** + * @param delegate the first-pass {@link CollectorManager} + * @param cacheScores whether to cache scores in addition to document IDs + * @param maxRAMMB the maximum RAM in MB to use per slice cache, or null if using maxDocsToCache + * @param maxDocsToCache the maximum number of documents to cache per slice, or null if using + * maxRAMMB + */ + public CachingCollectorManager( + CollectorManager<C, R> delegate, + boolean cacheScores, + Double maxRAMMB, + Integer maxDocsToCache) { + if (maxRAMMB == null && maxDocsToCache == null) { + throw new IllegalArgumentException("Either maxRAMMB or maxDocsToCache must be set"); + } + this.delegate = delegate; + this.cacheScores = cacheScores; + this.maxRAMMB = maxRAMMB; + this.maxDocsToCache = maxDocsToCache; + } + + @Override + public C newCollector() throws IOException { + C collector = delegate.newCollector(); + originalCollectors.add(collector); + CachingCollector cache = + maxDocsToCache != null + ? CachingCollector.create(collector, cacheScores, maxDocsToCache) + : CachingCollector.create(collector, cacheScores, maxRAMMB); + cachingCollectors.add(cache); + @SuppressWarnings("unchecked") + C wrapped = (C) cache; + return wrapped; + } + + @Override + public R reduce(Collection<C> collectors) throws IOException { + return delegate.reduce(originalCollectors); + } + + /** + * Returns {@code true} if all per-slice caches are intact (none overflowed their RAM budget), + * meaning {@link #replay} can be called. + */ + public boolean isCached() { + return !cachingCollectors.isEmpty() Review Comment: is it expected that caching collectors may be empty? Or should we throw illegal state if it is when the method is called? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
