Repository: nifi Updated Branches: refs/heads/master 0e736f59f -> 5ca6261de
NIFI-3576 Support for QueryInfo relationship, can be used to track no-hits Squashed commit includes related commit from GitHub user wietze. This closes #2601 Signed-off-by: Mike Thomsen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45bc1f1b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45bc1f1b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45bc1f1b Branch: refs/heads/master Commit: 45bc1f1b4f590db0e4a3cb76fcd4cd89ca1a06bf Parents: 2799211 Author: Otto Fowler <[email protected]> Authored: Mon Apr 2 13:16:39 2018 -0400 Committer: Mike Thomsen <[email protected]> Committed: Mon Apr 23 15:38:27 2018 -0400 ---------------------------------------------------------------------- .../elasticsearch/QueryElasticsearchHttp.java | 80 +++- .../TestQueryElasticsearchHttp.java | 20 + .../TestQueryElasticsearchHttpNoHits.java | 363 +++++++++++++++++++ 3 files changed, 452 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index 0f6ec46..15ac65d 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -16,7 +16,10 @@ */ package org.apache.nifi.processors.elasticsearch; +import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE; + import com.fasterxml.jackson.databind.JsonNode; +import java.util.Arrays; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.Response; @@ -31,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -69,8 +73,10 @@ import java.util.stream.Stream; + "To retrieve more records, use the ScrollElasticsearchHttp processor.") @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), + @WritesAttribute(attribute = "es.query.hitcount", description = "The number of hits for a query"), @WritesAttribute(attribute = "es.id", description = "The Elasticsearch document identifier"), @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = "es.query.url", description = "The Elasticsearch query that was built"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"), @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of " + "each result will be placed into corresponding attributes with this prefix.") }) @@ -81,12 +87,21 @@ import java.util.stream.Stream; description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + public enum QueryInfoRouteStrategy { + NEVER, + ALWAYS, + NOHIT + } + private static final String FROM_QUERY_PARAM = "from"; public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content"; public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes"; private static final String ATTRIBUTE_PREFIX = "es.result."; + static final AllowableValue ALWAYS = new AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route Query Info"); + static final AllowableValue NEVER = new AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query Info"); + static final AllowableValue NO_HITS = new AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query Info if the Query returns no hits"); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description( @@ -107,6 +122,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + "based on the processor properties and the results of the fetch operation.") .build(); + public static final Relationship REL_QUERY_INFO = new Relationship.Builder() + .name("query-info") + .description( + "Depending on the setting of the Routing Strategy for Query Info property, a FlowFile is routed to this relationship with " + + "the incoming FlowFile's attributes (if present), the number of hits, and the Elasticsearch query") + .build(); + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() .name("query-es-query") .displayName("Query") @@ -193,16 +215,21 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - private static final Set<Relationship> relationships; + public static final PropertyDescriptor ROUTING_QUERY_INFO_STRATEGY = new PropertyDescriptor.Builder() + .name("routing-query-info-strategy") + .displayName("Routing Strategy for Query Info") + .description("Specifies when to generate and route Query Info after a successful query") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(ALWAYS, NEVER, NO_HITS) + .defaultValue(NEVER.getValue()) + .required(false) + .build(); + + private volatile Set<Relationship> relationships = new HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE, REL_RETRY})); private static final List<PropertyDescriptor> propertyDescriptors; + private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER; static { - final Set<Relationship> _rels = new HashSet<>(); - _rels.add(REL_SUCCESS); - _rels.add(REL_FAILURE); - _rels.add(REL_RETRY); - relationships = Collections.unmodifiableSet(_rels); - final List<PropertyDescriptor> descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -218,6 +245,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { descriptors.add(SORT); descriptors.add(LIMIT); descriptors.add(TARGET); + descriptors.add(ROUTING_QUERY_INFO_STRATEGY); propertyDescriptors = Collections.unmodifiableList(descriptors); } @@ -238,6 +266,23 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + + if (ROUTING_QUERY_INFO_STRATEGY.equals(descriptor)) { + final Set<Relationship> relationshipSet = new HashSet<>(); + relationshipSet.add(REL_SUCCESS); + relationshipSet.add(REL_FAILURE); + relationshipSet.add(REL_RETRY); + + if (ALWAYS.getValue().equalsIgnoreCase(newValue) || NO_HITS.getValue().equalsIgnoreCase(newValue)) { + relationshipSet.add(REL_QUERY_INFO); + } + this.queryInfoRouteStrategy = QueryInfoRouteStrategy.valueOf(newValue); + this.relationships = relationshipSet; + } + } + + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -281,7 +326,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final ComponentLog logger = getLogger(); int fromIndex = 0; - int numResults; + int numResults = 0; try { logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType, @@ -305,10 +350,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, username, password, "GET", null); numResults = this.getPage(getResponse, queryUrl, context, session, flowFile, - logger, startNanos, targetIsContent); + logger, startNanos, targetIsContent, numResults); fromIndex += pageSize; getResponse.close(); - } while (numResults > 0 && !hitLimit); + } + while (numResults > 0 && !hitLimit); if (flowFile != null) { session.remove(flowFile); @@ -341,7 +387,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { private int getPage(final Response getResponse, final URL url, final ProcessContext context, final ProcessSession session, FlowFile flowFile, final ComponentLog logger, - final long startNanos, boolean targetIsContent) + final long startNanos, boolean targetIsContent, int priorResultCount) throws IOException { List<FlowFile> page = new ArrayList<>(); final int statusCode = getResponse.code(); @@ -352,6 +398,17 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); JsonNode hits = responseJson.get("hits").get("hits"); + // if there are no hits, and there have never been any hits in this run ( priorResultCount ) and + // we are in NOHIT or ALWAYS, send the query info + if ( (hits.size() == 0 && priorResultCount == 0 && queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT) + || queryInfoRouteStrategy == QueryInfoRouteStrategy.ALWAYS) { + FlowFile queryInfo = flowFile == null ? session.create() : session.create(flowFile); + session.putAttribute(queryInfo, "es.query.url", url.toExternalForm()); + session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size())); + session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json"); + session.transfer(queryInfo,REL_QUERY_INFO); + } + for(int i = 0; i < hits.size(); i++) { JsonNode hit = hits.get(i); String retrievedId = hit.get("_id").asText(); @@ -369,6 +426,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId); documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex); documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType); + documentFlowFile = session.putAttribute(documentFlowFile, "es.query.url", url.toExternalForm()); if (targetIsContent) { documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId); http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index 1b07d22..2863264 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -78,6 +78,25 @@ public class TestQueryElasticsearchHttp { } @Test + public void testQueryElasticsearchOnTrigger_withInput_withQueryInAttrs() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + + runAndVerifySuccess(true); + } + + @Test public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); @@ -161,6 +180,7 @@ public class TestQueryElasticsearchHttp { out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3"); } + out.assertAttributeExists("es.query.url"); } // By default, 3 files should go to Success http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java new file mode 100644 index 0000000..862aead --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; + +public class TestQueryElasticsearchHttpNoHits { + + private TestRunner runner; + + @After + public void teardown() { + runner = null; + } + + + @Test + public void testQueryElasticsearchOnTrigger_NoHits_NoHits() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NOHIT.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(0,1,0,true); + } + + @Test + public void testQueryElasticsearchOnTrigger_NoHits_Never() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NEVER.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(0,0,0,true); + } + + @Test + public void testQueryElasticsearchOnTrigger_NoHits_Always() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.ALWAYS.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(0,1,0,true); + } + + @Test + public void testQueryElasticsearchOnTrigger_Hits_NoHits() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true)); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NOHIT.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(3,0,0,true); + } + + @Test + public void testQueryElasticsearchOnTrigger_Hits_Never() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true)); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NEVER.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(3,0,0,true); + } + + @Test + public void testQueryElasticsearchOnTrigger_Hits_Always() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true)); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.ALWAYS.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(3,3,2,true); + } + + + + private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent) { + runner.enqueue("blah".getBytes(), new HashMap<String, String>() { + { + put("identifier", "28039652140"); + } + }); + + // Running once should page through the no hit doc + runner.run(1, true, true); + + runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, expectedQueryInfoResults); + if (expectedQueryInfoResults > 0) { + final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0); + assertNotNull(out); + if (targetIsContent) { + out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits)); + Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2")); + } + } + + runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, expectedResults); + if (expectedResults > 0) { + final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + if (targetIsContent) { + out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3"); + } + } + } + + // By default, 3 files should go to Success + private void runAndVerify(boolean targetIsContent) { + runAndVerify(0,1,0, targetIsContent); + } + + + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class QueryElasticsearchHttpTestProcessor extends QueryElasticsearchHttp { + Exception exceptionToThrow = null; + OkHttpClient client; + int goodStatusCode = 200; + String goodStatusMessage = "OK"; + + int badStatusCode; + String badStatusMessage; + int runNumber; + + boolean useHitPages; + + // query-page3 has no hits + List<String> noHitPages = Arrays.asList(getDoc("query-page3.json")); + List<String> hitPages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"), + getDoc("query-page3.json")); + + String expectedParam = null; + + public QueryElasticsearchHttpTestProcessor() { + this(false); + } + public QueryElasticsearchHttpTestProcessor(boolean useHitPages) { + this.useHitPages = useHitPages; + } + + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + /** + * Sets the status code and message for the 1st query + * + * @param code + * The status code to return + * @param message + * The status message + */ + void setStatus(int code, String message) { + this.setStatus(code, message, 1); + } + + /** + * Sets an query parameter (name=value) expected to be at the end of the URL for the query operation + * + * @param param + * The parameter to expect + */ + void setExpectedParam(String param) { + expectedParam = param; + } + + /** + * Sets the status code and message for the runNumber-th query + * + * @param code + * The status code to return + * @param message + * The status message + * @param runNumber + * The run number for which to set this status + */ + void setStatus(int code, String message, int runNumber) { + badStatusCode = code; + badStatusMessage = message; + this.runNumber = runNumber; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + OngoingStubbing<Call> stub = when(client.newCall(any(Request.class))); + List<String> pages; + if(useHitPages) { + pages = hitPages; + } else { + pages = noHitPages; + } + + for (int i = 0; i < pages.size(); i++) { + String page = pages.get(i); + if (runNumber == i + 1) { + stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage); + } else { + stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage); + } + } + } + + private OngoingStubbing<Call> mockReturnDocument(OngoingStubbing<Call> stub, + final String document, int statusCode, String statusMessage) { + return stub.thenAnswer(new Answer<Call>() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam))); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), document)) + .build(); + final Call call = mock(Call.class); + if (exceptionToThrow != null) { + when(call.execute()).thenThrow(exceptionToThrow); + } else { + when(call.execute()).thenReturn(mockResponse); + } + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + private static String getDoc(String filename) { + try { + return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8); + } catch (IOException e) { + System.out.println("Error reading document " + filename); + return ""; + } + } +}
