This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 22f8fd3d4f6 SOLR-17723: FeaturesSelectionStream: refactorings (#3296)
22f8fd3d4f6 is described below

commit 22f8fd3d4f61c208427271aadcc6c62b16e0f2a1
Author: David Smiley <[email protected]>
AuthorDate: Thu Apr 10 15:08:22 2025 -0400

    SOLR-17723: FeaturesSelectionStream: refactorings (#3296)
    
    Use Java Streams and with tuple iteration on-demand
    Use Java Map.merge
    Use NamedList.forEach instead of indexed iteration
---
 .../solrj/io/stream/FeaturesSelectionStream.java   | 89 ++++++++++------------
 .../solr/client/solrj/io/stream/KnnStream.java     | 12 +--
 .../solr/client/solrj/io/stream/RandomStream.java  | 12 +--
 3 files changed, 46 insertions(+), 67 deletions(-)

diff --git 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index 1f7d7d65e0f..1a694fc72b1 100644
--- 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -25,16 +25,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.stream.Stream;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -335,12 +336,13 @@ public class FeaturesSelectionStream extends TupleStream 
implements Expressible
         .withExpression(toExpression(factory).toString());
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public Tuple read() throws IOException {
     try {
       if (tupleIterator == null) {
-        Map<String, Double> termScores = new HashMap<>();
-        Map<String, Long> docFreqs = new HashMap<>();
+        final Map<String, Double> termScores = new HashMap<>();
+        final Map<String, Long> docFreqs = new HashMap<>();
 
         long numDocs = 0;
         for (NamedList<?> resp : callShards(getShardUrls())) {
@@ -352,56 +354,49 @@ public class FeaturesSelectionStream extends TupleStream 
implements Expressible
 
           numDocs += (Integer) resp.get("numDocs");
 
-          for (int i = 0; i < shardTopTerms.size(); i++) {
-            String term = shardTopTerms.getName(i);
-            double score = shardTopTerms.getVal(i);
-            int docFreq = shardDocFreqs.get(term);
-            double prevScore = termScores.containsKey(term) ? 
termScores.get(term) : 0;
-            long prevDocFreq = docFreqs.containsKey(term) ? docFreqs.get(term) 
: 0;
-            termScores.put(term, prevScore + score);
-            docFreqs.put(term, prevDocFreq + docFreq);
-          }
-        }
-
-        List<Tuple> tuples = new ArrayList<>(numTerms);
-        termScores = sortByValue(termScores);
-        int index = 0;
-        for (Map.Entry<String, Double> termScore : termScores.entrySet()) {
-          if (tuples.size() == numTerms) break;
-          index++;
-          Tuple tuple = new Tuple();
-          tuple.put(ID, featureSet + "_" + index);
-          tuple.put("index_i", index);
-          tuple.put("term_s", termScore.getKey());
-          tuple.put("score_f", termScore.getValue());
-          tuple.put("featureSet_s", featureSet);
-          long docFreq = docFreqs.get(termScore.getKey());
-          double d = Math.log(((double) numDocs / (double) (docFreq + 1)));
-          tuple.put("idf_d", d);
-          tuples.add(tuple);
+          shardTopTerms.forEach(
+              (term, score) -> {
+                int docFreq = shardDocFreqs.get(term);
+                termScores.merge(term, score, Double::sum);
+                docFreqs.merge(term, (long) docFreq, Long::sum);
+              });
         }
-
-        tuples.add(Tuple.EOF());
-
-        tupleIterator = tuples.iterator();
+        final long numDocsF = numDocs; // make final
+
+        final AtomicInteger idGen = new AtomicInteger(1);
+
+        tupleIterator =
+            termScores.entrySet().stream()
+                .sorted( // sort by score descending
+                    Comparator.<Map.Entry<String, 
Double>>comparingDouble(Entry::getValue)
+                        .reversed())
+                .limit(numTerms)
+                .map(
+                    (termScore) -> {
+                      int index = idGen.getAndIncrement();
+                      Tuple tuple = new Tuple();
+                      tuple.put(ID, featureSet + "_" + index);
+                      tuple.put("index_i", index);
+                      tuple.put("term_s", termScore.getKey());
+                      tuple.put("score_f", termScore.getValue());
+                      tuple.put("featureSet_s", featureSet);
+                      long docFreq = docFreqs.get(termScore.getKey());
+                      double d = Math.log(((double) numDocsF / (double) 
(docFreq + 1)));
+                      tuple.put("idf_d", d);
+                      return tuple;
+                    })
+                .iterator();
+      }
+      if (tupleIterator.hasNext()) {
+        return tupleIterator.next();
+      } else {
+        return Tuple.EOF();
       }
-
-      return tupleIterator.next();
     } catch (Exception e) {
       throw new IOException(e);
     }
   }
 
-  private <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> 
map) {
-    Map<K, V> result = new LinkedHashMap<>();
-    Stream<Map.Entry<K, V>> st = map.entrySet().stream();
-
-    st.sorted(Map.Entry.comparingByValue((c1, c2) -> c2.compareTo(c1)))
-        .forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
-
-    return result;
-  }
-
   protected static class FeaturesSelectionCall implements 
Callable<NamedList<?>> {
 
     private final String baseUrl;
diff --git 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
index 7f514a71b04..b88867e73b6 100644
--- 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
+++ 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
@@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 
 /**
@@ -210,7 +211,7 @@ public class KnnStream extends TupleStream implements 
Expressible {
       doCloseCache = false;
     }
 
-    ModifiableSolrParams params = getParams(this.props);
+    var params = new ModifiableSolrParams(new MapSolrParams(this.props)); // 
copy
 
     StringBuilder builder = new StringBuilder();
 
@@ -261,15 +262,6 @@ public class KnnStream extends TupleStream implements 
Expressible {
     }
   }
 
-  private ModifiableSolrParams getParams(Map<String, String> props) {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    for (Entry<String, String> entry : props.entrySet()) {
-      String value = entry.getValue();
-      params.add(entry.getKey(), value);
-    }
-    return params;
-  }
-
   @Override
   public int getCost() {
     return 0;
diff --git 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index c802fd870ee..de7db9b6b2c 100644
--- 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++ 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 
 /**
@@ -215,7 +216,7 @@ public class RandomStream extends TupleStream implements 
Expressible {
       doCloseCache = false;
     }
 
-    ModifiableSolrParams params = getParams(this.props);
+    var params = new ModifiableSolrParams(new MapSolrParams(this.props)); // 
copy
 
     params.remove(SORT); // Override any sort.
 
@@ -264,15 +265,6 @@ public class RandomStream extends TupleStream implements 
Expressible {
     }
   }
 
-  private ModifiableSolrParams getParams(Map<String, String> props) {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    for (Entry<String, String> entry : props.entrySet()) {
-      String value = entry.getValue();
-      params.add(entry.getKey(), value);
-    }
-    return params;
-  }
-
   @Override
   public int getCost() {
     return 0;

Reply via email to