gaobinlong commented on code in PR #15936:
URL: https://github.com/apache/lucene/pull/15936#discussion_r3353123018


##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.

Review Comment:
   Changed that, thanks!



##########
lucene/grouping/src/test/org/apache/lucene/search/grouping/TestBlockGrouping.java:
##########
@@ -77,6 +77,64 @@ public void testSimple() throws IOException {
     shard.close();
   }
 
+  public void testShardedBlockGrouping() throws IOException {

Review Comment:
   Created a sub-IndexSearcher with passing an ExecutorService and overriding 
the slices() method so that the collector manager can work with intra-segment 
concurrency. Use this sub-IndexSearcher in all of the test method in this file. 
   
   I found this testShardedGrouping() is not needed, since we can only compare 
the totalHitCount of the final result between a single collector manager and 
one collector manager per shard, that's because when sorting by relevance,  the 
final top-5 groups reduced by collector manager may differ from the one 
collector manager per shard's top-5 due to different score distributions, so 
finally I removed this test.



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java:
##########
@@ -290,4 +296,146 @@ public static <T> TopGroups<T> merge(
           totalMaxScore);
     }
   }
+
+  private record MergedBlockGroup(Object[] topValues, int shardIndex, int 
groupIndex) {}
+
+  private static class GroupComparator implements Comparator<MergedBlockGroup> 
{
+    @SuppressWarnings("rawtypes")
+    public final FieldComparator[] comparators;
+
+    public final int[] reversed;
+
+    @SuppressWarnings({"rawtypes"})
+    public GroupComparator(Sort groupSort) {
+      final SortField[] sortFields = groupSort.getSort();
+      comparators = new FieldComparator[sortFields.length];
+      reversed = new int[sortFields.length];
+      for (int compIDX = 0; compIDX < sortFields.length; compIDX++) {
+        final SortField sortField = sortFields[compIDX];
+        comparators[compIDX] = sortField.getComparator(1, Pruning.NONE);
+        reversed[compIDX] = sortField.getReverse() ? -1 : 1;
+      }
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int compare(MergedBlockGroup group, MergedBlockGroup other) {
+      if (group == other) {
+        return 0;
+      }
+      final Object[] groupValues = group.topValues;
+      final Object[] otherValues = other.topValues;
+      for (int compIDX = 0; compIDX < comparators.length; compIDX++) {
+        final int c =
+            reversed[compIDX]
+                * comparators[compIDX].compareValues(groupValues[compIDX], 
otherValues[compIDX]);
+        if (c != 0) {
+          return c;
+        }
+      }
+
+      assert group.shardIndex != other.shardIndex;
+      return group.shardIndex - other.shardIndex;
+    }
+  }
+
+  /**
+   * Merge TopGroups that are partitioned into blocks per shard. This method 
assumes that within
+   * each shard, the groups are sorted according to the groupSort.
+   *
+   * @param shardGroups list of TopGroups, one per shard.
+   * @param groupSort The {@link Sort} used to sort the groups. The top sorted 
document within each
+   *     * group according to groupSort, determines how that group sorts 
against other groups. This
+   *     * must be non-null, ie, if you want to groupSort by relevance use 
Sort.RELEVANCE.
+   * @param groupOffset Which group to start from.
+   * @param topNGroups How many top groups to keep.
+   * @param docSort The sort to use within each group
+   * @return TopGroups instance or null if there are no groups.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> TopGroups<T> mergeBlockGroups(
+      List<TopGroups<T>> shardGroups,
+      Sort groupSort,
+      int groupOffset,
+      int topNGroups,
+      Sort docSort) {
+    if (shardGroups.isEmpty()) {
+      return new TopGroups<>(
+          groupSort.getSort(),
+          docSort.getSort(),
+          0,
+          0,
+          (GroupDocs<T>[]) new GroupDocs<?>[0],
+          Float.NaN);
+    }
+
+    int totalGroupCount = 0;
+    int totalHitCount = 0;
+    int totalGroupedHitCount = 0;
+    for (TopGroups<T> sg : shardGroups) {
+      totalGroupCount += sg.totalGroupCount;
+      totalHitCount += sg.totalHitCount;
+    }
+
+    // k-way merge
+    GroupComparator groupComp = new GroupComparator(groupSort);
+    NavigableSet<MergedBlockGroup> queue = new TreeSet<>(groupComp);
+
+    float totalMaxScore = Float.NaN;
+    final boolean groupSortByRelevance = groupSort.equals(Sort.RELEVANCE);
+    // init queue
+    for (int idx = 0; idx < shardGroups.size(); idx++) {
+      TopGroups<T> topGroups = shardGroups.get(idx);
+      if (topGroups.groups.length == 0) {
+        continue;
+      }
+      if (!groupSortByRelevance) {
+        totalMaxScore = nonNANmax(totalMaxScore, topGroups.maxScore);
+      }
+      GroupDocs<T> firstGroupDocs = topGroups.groups[0];
+      queue.add(new MergedBlockGroup(firstGroupDocs.groupSortValues(), idx, 
0));
+    }
+
+    if (groupSortByRelevance) {
+      totalMaxScore = shardGroups.get(queue.first().shardIndex).maxScore;

Review Comment:
   Added a check for the queue.



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * <p>Example usage:
+ *
+ * <pre class="prettyprint">
+ * IndexSearcher searcher = new IndexSearcher(reader);
+ * Query lastDocPerGroupQuery = new TermQuery(new Term("groupEnd", "true"));
+ * Weight lastDocPerGroup = searcher.createWeight(
+ *     searcher.rewrite(lastDocPerGroupQuery), ScoreMode.COMPLETE_NO_SCORES, 
1);
+ *
+ * BlockGroupingCollectorManager&lt;BytesRef&gt; manager = new 
BlockGroupingCollectorManager&lt;&gt;(
+ *     Sort.RELEVANCE,   // groupSort
+ *     0,                // groupOffset
+ *     10,               // topNGroups
+ *     true,             // needsScores
+ *     lastDocPerGroup,
+ *     Sort.RELEVANCE,   // withinGroupSort
+ *     0,                // withinGroupOffset
+ *     5);               // maxDocsPerGroup
+ *
+ * TopGroups&lt;BytesRef&gt; result = searcher.search(query, manager);
+ * </pre>
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager<T>
+    implements CollectorManager<BlockGroupingCollector, TopGroups<T>> {
+
+  private final Sort groupSort;
+  private final int groupOffset;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  /**
+   * Creates a new BlockGroupingCollectorManager.
+   *
+   * @param groupSort the sort used to rank groups
+   * @param groupOffset the offset into the groups to start returning from
+   * @param topNGroups the number of top groups to collect
+   * @param needsScores whether scores are needed (must be true if groupSort 
or withinGroupSort uses
+   *     scores)
+   * @param lastDocPerGroup a {@link Weight} that matches the last document in 
each group block
+   * @param withinGroupSort the sort used to rank documents within each group
+   * @param withinGroupOffset the offset into each group's documents to start 
returning from
+   * @param maxDocsPerGroup the maximum number of documents to return per group
+   */
+  public BlockGroupingCollectorManager(
+      Sort groupSort,
+      int groupOffset,
+      int topNGroups,
+      boolean needsScores,
+      Weight lastDocPerGroup,
+      Sort withinGroupSort,
+      int withinGroupOffset,
+      int maxDocsPerGroup) {
+    this.groupSort = groupSort;
+    this.groupOffset = groupOffset;
+    this.topNGroups = topNGroups;
+    this.needsScores = needsScores;
+    this.lastDocPerGroup = lastDocPerGroup;
+    this.withinGroupSort = withinGroupSort;
+    this.withinGroupOffset = withinGroupOffset;
+    this.maxDocsPerGroup = maxDocsPerGroup;
+  }
+
+  @Override
+  public BlockGroupingCollector newCollector() throws IOException {
+    return new BlockGroupingCollector(groupSort, topNGroups, needsScores, 
lastDocPerGroup);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public TopGroups<T> reduce(Collection<BlockGroupingCollector> collectors) 
throws IOException {
+    // Merge results from multiple collectors

Review Comment:
   Removed yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to