This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 22f8fd3d4f6 SOLR-17723: FeaturesSelectionStream: refactorings (#3296)
22f8fd3d4f6 is described below
commit 22f8fd3d4f61c208427271aadcc6c62b16e0f2a1
Author: David Smiley <[email protected]>
AuthorDate: Thu Apr 10 15:08:22 2025 -0400
SOLR-17723: FeaturesSelectionStream: refactorings (#3296)
Use Java Streams and with tuple iteration on-demand
Use Java Map.merge
Use NamedList.forEach instead of indexed iteration
---
.../solrj/io/stream/FeaturesSelectionStream.java | 89 ++++++++++------------
.../solr/client/solrj/io/stream/KnnStream.java | 12 +--
.../solr/client/solrj/io/stream/RandomStream.java | 12 +--
3 files changed, 46 insertions(+), 67 deletions(-)
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index 1f7d7d65e0f..1a694fc72b1 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -25,16 +25,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.stream.Stream;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -335,12 +336,13 @@ public class FeaturesSelectionStream extends TupleStream
implements Expressible
.withExpression(toExpression(factory).toString());
}
+ @SuppressWarnings("unchecked")
@Override
public Tuple read() throws IOException {
try {
if (tupleIterator == null) {
- Map<String, Double> termScores = new HashMap<>();
- Map<String, Long> docFreqs = new HashMap<>();
+ final Map<String, Double> termScores = new HashMap<>();
+ final Map<String, Long> docFreqs = new HashMap<>();
long numDocs = 0;
for (NamedList<?> resp : callShards(getShardUrls())) {
@@ -352,56 +354,49 @@ public class FeaturesSelectionStream extends TupleStream
implements Expressible
numDocs += (Integer) resp.get("numDocs");
- for (int i = 0; i < shardTopTerms.size(); i++) {
- String term = shardTopTerms.getName(i);
- double score = shardTopTerms.getVal(i);
- int docFreq = shardDocFreqs.get(term);
- double prevScore = termScores.containsKey(term) ?
termScores.get(term) : 0;
- long prevDocFreq = docFreqs.containsKey(term) ? docFreqs.get(term)
: 0;
- termScores.put(term, prevScore + score);
- docFreqs.put(term, prevDocFreq + docFreq);
- }
- }
-
- List<Tuple> tuples = new ArrayList<>(numTerms);
- termScores = sortByValue(termScores);
- int index = 0;
- for (Map.Entry<String, Double> termScore : termScores.entrySet()) {
- if (tuples.size() == numTerms) break;
- index++;
- Tuple tuple = new Tuple();
- tuple.put(ID, featureSet + "_" + index);
- tuple.put("index_i", index);
- tuple.put("term_s", termScore.getKey());
- tuple.put("score_f", termScore.getValue());
- tuple.put("featureSet_s", featureSet);
- long docFreq = docFreqs.get(termScore.getKey());
- double d = Math.log(((double) numDocs / (double) (docFreq + 1)));
- tuple.put("idf_d", d);
- tuples.add(tuple);
+ shardTopTerms.forEach(
+ (term, score) -> {
+ int docFreq = shardDocFreqs.get(term);
+ termScores.merge(term, score, Double::sum);
+ docFreqs.merge(term, (long) docFreq, Long::sum);
+ });
}
-
- tuples.add(Tuple.EOF());
-
- tupleIterator = tuples.iterator();
+ final long numDocsF = numDocs; // make final
+
+ final AtomicInteger idGen = new AtomicInteger(1);
+
+ tupleIterator =
+ termScores.entrySet().stream()
+ .sorted( // sort by score descending
+ Comparator.<Map.Entry<String,
Double>>comparingDouble(Entry::getValue)
+ .reversed())
+ .limit(numTerms)
+ .map(
+ (termScore) -> {
+ int index = idGen.getAndIncrement();
+ Tuple tuple = new Tuple();
+ tuple.put(ID, featureSet + "_" + index);
+ tuple.put("index_i", index);
+ tuple.put("term_s", termScore.getKey());
+ tuple.put("score_f", termScore.getValue());
+ tuple.put("featureSet_s", featureSet);
+ long docFreq = docFreqs.get(termScore.getKey());
+ double d = Math.log(((double) numDocsF / (double)
(docFreq + 1)));
+ tuple.put("idf_d", d);
+ return tuple;
+ })
+ .iterator();
+ }
+ if (tupleIterator.hasNext()) {
+ return tupleIterator.next();
+ } else {
+ return Tuple.EOF();
}
-
- return tupleIterator.next();
} catch (Exception e) {
throw new IOException(e);
}
}
- private <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V>
map) {
- Map<K, V> result = new LinkedHashMap<>();
- Stream<Map.Entry<K, V>> st = map.entrySet().stream();
-
- st.sorted(Map.Entry.comparingByValue((c1, c2) -> c2.compareTo(c1)))
- .forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
-
- return result;
- }
-
protected static class FeaturesSelectionCall implements
Callable<NamedList<?>> {
private final String baseUrl;
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
index 7f514a71b04..b88867e73b6 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
@@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
/**
@@ -210,7 +211,7 @@ public class KnnStream extends TupleStream implements
Expressible {
doCloseCache = false;
}
- ModifiableSolrParams params = getParams(this.props);
+ var params = new ModifiableSolrParams(new MapSolrParams(this.props)); //
copy
StringBuilder builder = new StringBuilder();
@@ -261,15 +262,6 @@ public class KnnStream extends TupleStream implements
Expressible {
}
}
- private ModifiableSolrParams getParams(Map<String, String> props) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- for (Entry<String, String> entry : props.entrySet()) {
- String value = entry.getValue();
- params.add(entry.getKey(), value);
- }
- return params;
- }
-
@Override
public int getCost() {
return 0;
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index c802fd870ee..de7db9b6b2c 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
/**
@@ -215,7 +216,7 @@ public class RandomStream extends TupleStream implements
Expressible {
doCloseCache = false;
}
- ModifiableSolrParams params = getParams(this.props);
+ var params = new ModifiableSolrParams(new MapSolrParams(this.props)); //
copy
params.remove(SORT); // Override any sort.
@@ -264,15 +265,6 @@ public class RandomStream extends TupleStream implements
Expressible {
}
}
- private ModifiableSolrParams getParams(Map<String, String> props) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- for (Entry<String, String> entry : props.entrySet()) {
- String value = entry.getValue();
- params.add(entry.getKey(), value);
- }
- return params;
- }
-
@Override
public int getCost() {
return 0;