[
https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392256#comment-16392256
]
ASF GitHub Bot commented on NIFI-4516:
--------------------------------------
Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2517#discussion_r173347053
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java
---
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonReader;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.xmlunit.matchers.CompareMatcher;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+public class TestFetchSolr {
+ static final String DEFAULT_SOLR_CORE = "testCollection";
+
+ private static final SimpleDateFormat df = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
+ static {
+ df.setTimeZone(TimeZone.getTimeZone("GMT"));
+ }
+
+ private SolrClient solrClient;
+
+ @Before
+ public void setup() {
+
+ try {
+
+ // create an EmbeddedSolrServer for the processor to use
+ String relPath =
getClass().getProtectionDomain().getCodeSource()
+ .getLocation().getFile() + "../../target";
+
+ solrClient =
EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
+ DEFAULT_SOLR_CORE, relPath);
+
+ for (int i = 0; i < 10; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "doc" + i);
+ Date date = new Date();
+ doc.addField("created", df.format(date));
+ doc.addField("string_single", "single" + i + ".1");
+ doc.addField("string_multi", "multi" + i + ".1");
+ doc.addField("string_multi", "multi" + i + ".2");
+ doc.addField("integer_single", i);
+ doc.addField("integer_multi", 1);
+ doc.addField("integer_multi", 2);
+ doc.addField("integer_multi", 3);
+ doc.addField("double_single", 0.5 + i);
+
+ solrClient.add(doc);
+ System.out.println(doc.getField("created").getValue());
+
+ }
+ solrClient.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @After
+ public void teardown() {
+ try {
+ solrClient.close();
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testAllFacetCategories() throws IOException {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.SOLR_QUERY_STRING, "q=*:*" +
+
"&facet=true&facet.interval=integer_single&facet.interval.set=[4,7]&facet.interval.set=[5,7]"
+
+
"&facet.field=integer_multi&facet.query=integer_multi:2&stats=true&stats.field=integer_multi"
+
+
"&facet.range=created&facet.range.start=NOW/MINUTE&facet.range.end=NOW/MINUTE%2B1MINUTE&facet.range.gap=%2B20SECOND"
+
+
"&facet.query=*:*&facet.query=integer_multi:2&facet.query=integer_multi:3"
+ );
+ runner.enqueue(new ByteArrayInputStream("test".getBytes()));
+ runner.run();
+ runner.assertTransferCount(FetchSolr.FACETS, 1);
+
+ JsonReader reader = new JsonReader(new InputStreamReader(new
ByteArrayInputStream(
+
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(FetchSolr.FACETS).get(0)))));
+ reader.beginObject();
+ while (reader.hasNext()) {
+ String name = reader.nextName();
+ if (name.equals("facet_queries")) {
+ assertEquals(returnCheckSumForArrayOfJsonObjects(reader),
30);
+ } else if (name.equals("facet_fields")) {
+ reader.beginObject();
+ assertEquals(reader.nextName(), "integer_multi");
+ assertEquals(returnCheckSumForArrayOfJsonObjects(reader),
30);
+ reader.endObject();
+ } else if (name.equals("facet_ranges")) {
+ reader.beginObject();
+ assertEquals(reader.nextName(), "created");
+ assertEquals(returnCheckSumForArrayOfJsonObjects(reader),
10);
+ reader.endObject();
+ } else if (name.equals("facet_intervals")) {
+ reader.beginObject();
+ assertEquals(reader.nextName(), "integer_single");
+ assertEquals(returnCheckSumForArrayOfJsonObjects(reader),
7);
+ reader.endObject();
+ }
+ }
+ reader.endObject();
+ reader.close();
+ }
+
+ private int returnCheckSumForArrayOfJsonObjects(JsonReader reader)
throws IOException {
+ int checkSum = 0;
+ reader.beginArray();
+ while (reader.hasNext()) {
+ reader.beginObject();
+ while (reader.hasNext()) {
+ if (reader.nextName().equals("count"))
+ checkSum += reader.nextInt();
+ else
+ reader.skipValue();
+ }
+ reader.endObject();
+ }
+ reader.endArray();
+ return checkSum;
+ }
+
+ @Test
+ public void testFacetTrueButNull() throws IOException {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.SOLR_QUERY_STRING,
"q=*:*&facet=true&stats=true");
+ runner.enqueue(new ByteArrayInputStream("test".getBytes()));
+ runner.run();
+
+ runner.assertTransferCount(FetchSolr.FACETS, 1);
+ runner.assertTransferCount(FetchSolr.STATS, 1);
+
+ // Check for empty nestet Objects in JSON
+ JsonReader reader = new JsonReader(new InputStreamReader(new
ByteArrayInputStream(
+
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(FetchSolr.FACETS).get(0)))));
+ reader.beginObject();
+ while (reader.hasNext()) {
+ if (reader.nextName().equals("facet_queries")) {
+ reader.beginArray();
+ assertFalse(reader.hasNext());
+ reader.endArray();
+ } else {
+ reader.beginObject();
+ assertFalse(reader.hasNext());
+ reader.endObject();
+ }
+ }
+ reader.endObject();
+
+ JsonReader reader_stats = new JsonReader(new InputStreamReader(new
ByteArrayInputStream(
+
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(FetchSolr.STATS).get(0)))));
+ reader_stats.beginObject();
+ assertEquals(reader_stats.nextName(), "stats_fields");
+ reader_stats.beginObject();
+ assertFalse(reader_stats.hasNext());
+ reader_stats.endObject();
+ reader_stats.endObject();
+
+ reader.close();
+ reader_stats.close();
+ }
+
+ @Test
+ public void testStats() throws IOException {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.SOLR_QUERY_STRING,
"q=*:*&stats=true&stats.field=integer_single");
+ runner.enqueue(new ByteArrayInputStream("test".getBytes()));
+ runner.run();
+
+ runner.assertTransferCount(FetchSolr.STATS, 1);
+ JsonReader reader = new JsonReader(new InputStreamReader(new
ByteArrayInputStream(
+
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(FetchSolr.STATS).get(0)))));
+ reader.beginObject();
+ assertEquals(reader.nextName(), "stats_fields");
+ reader.beginObject();
+ assertEquals(reader.nextName(), "integer_single");
+ reader.beginObject();
+ while (reader.hasNext()) {
+ String name = reader.nextName();
+ switch (name) {
+ case "min": assertEquals(reader.nextString(), "0.0");
break;
+ case "max": assertEquals(reader.nextString(), "9.0");
break;
+ case "count": assertEquals(reader.nextInt(), 10); break;
+ case "sum": assertEquals(reader.nextString(), "45.0");
break;
+ default: reader.skipValue(); break;
+ }
+ }
+ reader.endObject();
+ reader.endObject();
+ reader.endObject();
+ reader.close();
+ }
+
+ @Test
+ public void testFailure() {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.REQUEST_HANDLER, "/sel");
+
+ runner.setNonLoopConnection(false);
+ runner.run();
+
+ runner.assertTransferCount(FetchSolr.FAILURE, 1);
+ runner.assertTransferCount(FetchSolr.ORIGINAL, 0);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(FetchSolr.FAILURE).get(0);
+ flowFile.assertAttributeExists(FetchSolr.EXCEPTION);
+ flowFile.assertAttributeExists(FetchSolr.EXCEPTION_MESSAGE);
+ }
+
+ @Test
+ public void testExpressionLanguage() {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ ByteArrayInputStream is = new ByteArrayInputStream(new byte[0]);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.SOLR_QUERY_STRING, "${query}");
+
+ runner.enqueue(new byte[0], new HashMap<String,String>(){{
+ put("query", "q=id:doc0&fl=id");
+ }});
+ runner.run();
+ runner.assertTransferCount(FetchSolr.RESULTS, 1);
+
+ String expectedXml = "<docs><doc boost=\"1.0\"><field
name=\"id\">doc0</field></doc></docs>";
+ assertThat(expectedXml, CompareMatcher.isIdenticalTo(new
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(FetchSolr.RESULTS).get(0)))));
+ }
+
+ @Test
+ public void testStandardResponse() {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.SOLR_QUERY_STRING, "q=id:(doc0 OR
doc1)&fl=id&sort=id desc");
+
+ runner.setNonLoopConnection(false);
+ runner.run();
+ runner.assertTransferCount(FetchSolr.RESULTS, 1);
+ runner.assertTransferCount(FetchSolr.ORIGINAL, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(FetchSolr.RESULTS).get(0);
+ flowFile.assertAttributeExists(FetchSolr.ATTRIBUTE_CURSOR_MARK);
+ flowFile.assertAttributeExists(FetchSolr.ATTRIBUTE_SOLR_STATUS);
+ flowFile.assertAttributeExists(FetchSolr.ATTRIBUTE_QUERY_TIME);
+
+ String expectedXml = "<docs><doc boost=\"1.0\"><field
name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field
name=\"id\">doc0</field></doc></docs>";
+ assertThat(expectedXml, CompareMatcher.isIdenticalTo(new
String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(FetchSolr.RESULTS).get(0)))));
+ }
+
+ @Test
+ public void testRecordResponse() throws IOException,
InitializationException {
+ final TestableProcessor proc = new TestableProcessor(solrClient);
+
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(SolrUtils.SOLR_TYPE,
SolrUtils.SOLR_TYPE_CLOUD.getValue());
+ runner.setProperty(FetchSolr.RETURN_TYPE,
FetchSolr.MODE_REC.getValue());
+ runner.setProperty(SolrUtils.SOLR_LOCATION,
"http://localhost:8443/solr");
+ runner.setProperty(SolrUtils.COLLECTION, "testCollection");
+ runner.setProperty(FetchSolr.SOLR_QUERY_STRING,
"q=*:*&fl=id,created,integer_single&rows=10");
+
+ final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", jsonWriter);
+ runner.setProperty(jsonWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.setProperty(jsonWriter, "Schema Write Strategy",
"full-schema-attribute");
+ runner.enableControllerService(jsonWriter);
+ runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
+
+ runner.setNonLoopConnection(false);
+
+ runner.run(1,true, true);
+ runner.assertQueueEmpty();
+ runner.assertTransferCount(FetchSolr.RESULTS, 1);
+ runner.assertTransferCount(FetchSolr.ORIGINAL, 1);
+ for (MockFlowFile flowFile :
runner.getFlowFilesForRelationship(FetchSolr.RESULTS))
--- End diff --
Curly brackets.
> Add FetchSolr processor
> -----------------------
>
> Key: NIFI-4516
> URL: https://issues.apache.org/jira/browse/NIFI-4516
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Johannes Peter
> Assignee: Johannes Peter
> Priority: Major
> Labels: features
>
> The processor shall be capable
> * to query Solr within a workflow,
> * to make use of standard functionalities of Solr such as faceting,
> highlighting, result grouping, etc.,
> * to make use of NiFis expression language to build Solr queries,
> * to handle results as records.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)