Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2517#discussion_r177778458
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java
---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonWriter;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.FacetField;
+import org.apache.solr.client.solrj.response.FieldStatsInfo;
+import org.apache.solr.client.solrj.response.IntervalFacet;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RangeFacet;
+import org.apache.solr.client.solrj.response.RangeFacet.Count;
+import org.apache.solr.common.params.FacetParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.params.StatsParams;
+import org.apache.solr.servlet.SolrRequestParsers;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static
org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static
org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static
org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
+
+@Tags({"Apache", "Solr", "Get", "Query", "Records"})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile
in the format of XML or using a Record Writer")
+@WritesAttributes({
+ @WritesAttribute(attribute = "solr.connect", description = "Solr
connect string"),
+ @WritesAttribute(attribute = "solr.collection", description =
"Solr collection"),
+ @WritesAttribute(attribute = "solr.query", description = "Query
string sent to Solr"),
+ @WritesAttribute(attribute = "solr.cursor.mark", description =
"Cursor mark can be used for scrolling Solr"),
+ @WritesAttribute(attribute = "solr.status.code", description =
"Status code of Solr request. A status code of 0 indicates that the request was
successfully processed"),
+ @WritesAttribute(attribute = "solr.query.time", description = "The
elapsed time to process the query (in ms)"),
+ @WritesAttribute(attribute = "mime.type", description = "The mime
type of the data format"),
+ @WritesAttribute(attribute = "fetchsolr.exeption.class",
description = "The Java exception class raised when the processor fails"),
+ @WritesAttribute(attribute = "fetchsolr.exeption.message",
description = "The Java exception message raised when the processor fails")
+})
+public class FetchSolr extends SolrProcessor {
+
+ public static final AllowableValue MODE_XML = new
AllowableValue("XML");
+ public static final AllowableValue MODE_REC = new
AllowableValue("Records");
+ public static final String MIME_TYPE_JSON = "application/json";
+ public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect";
+ public static final String ATTRIBUTE_SOLR_COLLECTION =
"solr.collection";
+ public static final String ATTRIBUTE_SOLR_QUERY = "solr.query";
+ public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark";
+ public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code";
+ public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time";
+ public static final String EXCEPTION = "fetchsolr.exeption";
+ public static final String EXCEPTION_MESSAGE =
"fetchsolr.exeption.message";
+
+ public static final PropertyDescriptor SOLR_QUERY_STRING = new
PropertyDescriptor
+ .Builder().name("solr_query_string")
+ .displayName("Solr Query String")
+ .description("A query string to execute against Solr")
+ .required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+ .expressionLanguageSupported(true)
+ .defaultValue("q=*:*")
+ .build();
+
+ public static final PropertyDescriptor RETURN_TYPE = new
PropertyDescriptor
+ .Builder().name("Return Type")
+ .displayName("Return Type")
+ .description("Output format of Solr results. Write Solr
documents to FlowFiles as XML or using a Record Writer")
+ .required(true)
+ .allowableValues(MODE_XML, MODE_REC)
+ .defaultValue(MODE_XML.getValue())
+ .build();
+
+ public static final PropertyDescriptor REQUEST_HANDLER = new
PropertyDescriptor
+ .Builder().name("request_handler")
+ .displayName("Request Handler")
+ .description("Define a request handler here, e. g. /query")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("/select")
+ .build();
+
+ public static final Relationship RESULTS = new
Relationship.Builder().name("results")
+ .description("Results of Solr queries").build();
+ public static final Relationship FACETS = new
Relationship.Builder().name("facets")
+ .description("Results of faceted search").build();
+ public static final Relationship STATS = new
Relationship.Builder().name("stats")
+ .description("Stats about Solr index").build();
+ public static final Relationship ORIGINAL = new
Relationship.Builder().name("original")
+ .description("Original flowfile").build();
+ public static final Relationship FAILURE = new
Relationship.Builder().name("failure")
+ .description("Failure relationship").build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return this.descriptors;
+ }
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(SOLR_TYPE);
+ descriptors.add(SOLR_LOCATION);
+ descriptors.add(COLLECTION);
+ descriptors.add(SOLR_QUERY_STRING);
+ descriptors.add(RETURN_TYPE);
+ descriptors.add(RECORD_WRITER);
+ descriptors.add(REQUEST_HANDLER);
+ descriptors.add(JAAS_CLIENT_APP_NAME);
+ descriptors.add(BASIC_USERNAME);
+ descriptors.add(BASIC_PASSWORD);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(SOLR_SOCKET_TIMEOUT);
+ descriptors.add(SOLR_CONNECTION_TIMEOUT);
+ descriptors.add(SOLR_MAX_CONNECTIONS);
+ descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+ descriptors.add(ZK_CLIENT_TIMEOUT);
+ descriptors.add(ZK_CONNECTION_TIMEOUT);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(FAILURE);
+ relationships.add(RESULTS);
+ relationships.add(FACETS);
+ relationships.add(STATS);
+ relationships.add(ORIGINAL);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ public static final Set<String> SUPPORTED_SEARCH_COMPONENTS = new
HashSet<String>();
+ static {
+
SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS,
FacetParams.FACET));
+ }
+ public static final Set<String> SEARCH_COMPONENTS_ON = new
HashSet<String>();
+ static {
+ SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes"));
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final ComponentLog logger = getLogger();
+
+ FlowFile flowFileRequest = session.get();
+ boolean keepOriginal;
+
+ if (flowFileRequest == null) {
+ if (context.hasNonLoopConnection()) {
+ return;
+ }
+ flowFileRequest = session.create();
+ keepOriginal = false;
+ } else {
+ keepOriginal = true;
+ }
+
+ final String queryString =
context.getProperty(SOLR_QUERY_STRING).evaluateAttributeExpressions(flowFileRequest).getValue();
+
+ final Map<String,String[]> solrParams =
SolrRequestParsers.parseQueryString(queryString).getMap();
+ final Set<String> searchComponents =
extractSearchComponents(solrParams);
+ final SolrQuery solrQuery = new SolrQuery();
+ solrQuery.add(new MultiMapSolrParams(solrParams));
+
+ final String requestHandler =
context.getProperty(REQUEST_HANDLER).getValue();
+ solrQuery.setRequestHandler(requestHandler);
+
+ final QueryRequest req = new QueryRequest(solrQuery);
+
+ if (isBasicAuthEnabled()) {
+ req.setBasicAuthCredentials(getUsername(), getPassword());
+ }
+
+ flowFileRequest = session.putAttribute(flowFileRequest,
ATTRIBUTE_SOLR_CONNECT, getSolrLocation());
+ if
(SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
+ flowFileRequest = session.putAttribute(flowFileRequest,
ATTRIBUTE_SOLR_COLLECTION,
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
+ }
+ flowFileRequest = session.putAttribute(flowFileRequest,
ATTRIBUTE_SOLR_QUERY, solrQuery.toString());
+
+ FlowFile flowFileResponse;
+ if (!keepOriginal) {
+ flowFileResponse = flowFileRequest;
+ flowFileRequest = null;
+ } else {
+ flowFileResponse = session.create(flowFileRequest);
+ }
+
+ try {
+ final QueryResponse response = req.process(getSolrClient());
+
+ Map<String,String> responseAttributes = new HashMap<>();
+ responseAttributes.put(ATTRIBUTE_CURSOR_MARK,
response.getNextCursorMark());
+ responseAttributes.put(ATTRIBUTE_SOLR_STATUS,
String.valueOf(response.getStatus()));
+ responseAttributes.put(ATTRIBUTE_QUERY_TIME,
String.valueOf(response.getQTime()));
+ flowFileResponse = session.putAllAttributes(flowFileResponse,
responseAttributes);
+
+ if (response.getResults().size() > 0) {
--- End diff --
What is the expected behavior when there are more results than were asked
for in the initial search?
Say there are 1k total results, but the query asked for rows 0 to 10.
Are we going to page through the results and send out multiple flow files?
Do we just don't handle this case and it is up to the user to make the rows
large enough (which also means they could blow up memory if they ask for too
many results in a single query)?
---