Repository: nifi
Updated Branches:
  refs/heads/master 69a08e78c -> 6b5015e39


NIFI-4218: Dynamic properties as query parameters in ESHttp processors

This closes #2049.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b5015e3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b5015e3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b5015e3

Branch: refs/heads/master
Commit: 6b5015e39b4233cf230151fb45bebcb21df03730
Parents: 69a08e7
Author: Matt Burgess <[email protected]>
Authored: Wed Aug 2 09:41:12 2017 -0400
Committer: Bryan Bende <[email protected]>
Committed: Thu Aug 17 14:29:05 2017 -0400

----------------------------------------------------------------------
 .../AbstractElasticsearchHttpProcessor.java     | 17 +++++++++
 .../elasticsearch/FetchElasticsearchHttp.java   | 25 ++++++++++---
 .../elasticsearch/PutElasticsearchHttp.java     | 29 ++++++++++-----
 .../PutElasticsearchHttpRecord.java             | 26 +++++++++-----
 .../elasticsearch/QueryElasticsearchHttp.java   | 26 ++++++++++----
 .../elasticsearch/ScrollElasticsearchHttp.java  | 29 ++++++++++-----
 .../TestFetchElasticsearchHttp.java             | 35 +++++++++++++++++-
 .../elasticsearch/TestPutElasticsearchHttp.java | 36 ++++++++++++++++++-
 .../TestPutElasticsearchHttpRecord.java         | 38 ++++++++++++++++++++
 .../TestQueryElasticsearchHttp.java             | 34 ++++++++++++++++--
 .../TestScrollElasticsearchHttp.java            | 36 +++++++++++++++++--
 11 files changed, 288 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index d67ce6c..df986b1 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -50,6 +50,12 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public abstract class AbstractElasticsearchHttpProcessor extends 
AbstractElasticsearchProcessor {
 
+    static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+    static final String QUERY_QUERY_PARAM = "q";
+    static final String SORT_QUERY_PARAM = "sort";
+    static final String SIZE_QUERY_PARAM = "size";
+
+
     public static final PropertyDescriptor ES_URL = new 
PropertyDescriptor.Builder()
             .name("elasticsearch-http-url")
             .displayName("Elasticsearch URL")
@@ -98,6 +104,17 @@ public abstract class AbstractElasticsearchHttpProcessor 
extends AbstractElastic
     private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = 
new AtomicReference<>();
 
     @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
     protected void createElasticsearchClient(ProcessContext context) throws 
ProcessException {
         okHttpClientAtomicReference.set(null);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index 06be4d5..441cc10 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -21,6 +21,7 @@ import okhttp3.OkHttpClient;
 import okhttp3.Response;
 import okhttp3.ResponseBody;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -37,9 +38,9 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.codehaus.jackson.JsonNode;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -47,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -65,10 +67,13 @@ import java.util.stream.Stream;
         @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
         @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type")
 })
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        supportsExpressionLanguage = true,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")
 public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor 
{
 
-    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
-
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are read from Elasticsearch are 
routed to this relationship.")
@@ -212,7 +217,7 @@ public class FetchElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
 
             // read the url property from the context
             final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-            final URL url = buildRequestURL(urlstr, docId, index, docType, 
fields);
+            final URL url = buildRequestURL(urlstr, docId, index, docType, 
fields, context);
             final long startNanos = System.nanoTime();
 
             getResponse = sendRequestToElasticsearch(okHttpClient, url, 
username, password, "GET", null);
@@ -304,7 +309,7 @@ public class FetchElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
         }
     }
 
-    private URL buildRequestURL(String baseUrl, String docId, String index, 
String type, String fields) throws MalformedURLException {
+    private URL buildRequestURL(String baseUrl, String docId, String index, 
String type, String fields, ProcessContext context) throws 
MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -317,6 +322,16 @@ public class FetchElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
             builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, 
trimmedFields);
         }
 
+        // Find the user-added properties and set them as query parameters on 
the URL
+        for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
+            PropertyDescriptor pd = property.getKey();
+            if (pd.isDynamic()) {
+                if (property.getValue() != null) {
+                    builder.addQueryParameter(pd.getName(), 
context.getProperty(pd).evaluateAttributeExpressions().getValue());
+                }
+            }
+        }
+
         return builder.build().url();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 70884be..52892a0 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
+import okhttp3.HttpUrl;
 import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
 import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.ResponseBody;
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -39,13 +41,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.util.StringUtils;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.node.ArrayNode;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -54,6 +55,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@@ -65,6 +67,11 @@ import static 
org.apache.commons.lang3.StringUtils.trimToEmpty;
 @Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", 
"put", "http"})
 @CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, 
using the specified parameters such as "
         + "the index to insert into and the type of the document.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        supportsExpressionLanguage = true,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")
 public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
@@ -223,14 +230,18 @@ public class PutElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
 
         final StringBuilder sb = new StringBuilder();
         final String baseUrl = 
trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-        final URL url;
-        try {
-            url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + 
"_bulk");
-        } catch (MalformedURLException mue) {
-            // Since we have a URL validator, something has gone very wrong, 
throw a ProcessException
-            context.yield();
-            throw new ProcessException(mue);
+        HttpUrl.Builder urlBuilder = 
HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
+
+        // Find the user-added properties and set them as query parameters on 
the URL
+        for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
+            PropertyDescriptor pd = property.getKey();
+            if (pd.isDynamic()) {
+                if (property.getValue() != null) {
+                    urlBuilder = urlBuilder.addQueryParameter(pd.getName(), 
context.getProperty(pd).evaluateAttributeExpressions().getValue());
+                }
+            }
         }
+        final URL url = urlBuilder.build().url();
 
         for (FlowFile file : flowFiles) {
             final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 24d0057..c618f6c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
+import okhttp3.HttpUrl;
 import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
 import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.ResponseBody;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -66,7 +68,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigInteger;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -87,6 +88,11 @@ import static 
org.apache.commons.lang3.StringUtils.trimToEmpty;
         + "the index to insert into and the type of the document, as well as 
the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to 
"
         + "send the records. This means that the entire contents of the 
incoming flow file are read into memory, and each record is transformed into a 
JSON document "
         + "which is added to a single HTTP request body. For very large flow 
files (files with a large number of records, e.g.), this could cause memory 
usage issues.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        supportsExpressionLanguage = true,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")
 public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcessor {
 
     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
@@ -239,14 +245,18 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
         final ComponentLog logger = getLogger();
 
         final String baseUrl = 
trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-        final URL url;
-        try {
-            url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + 
"_bulk");
-        } catch (MalformedURLException mue) {
-            // Since we have a URL validator, something has gone very wrong, 
throw a ProcessException
-            context.yield();
-            throw new ProcessException(mue);
+        HttpUrl.Builder urlBuilder = 
HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
+
+        // Find the user-added properties and set them as query parameters on 
the URL
+        for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
+            PropertyDescriptor pd = property.getKey();
+            if (pd.isDynamic()) {
+                if (property.getValue() != null) {
+                    urlBuilder = urlBuilder.addQueryParameter(pd.getName(), 
context.getProperty(pd).evaluateAttributeExpressions().getValue());
+                }
+            }
         }
+        final URL url = urlBuilder.build().url();
 
         final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
         if (StringUtils.isEmpty(index)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index f65816e..b25f173 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -33,6 +34,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -49,7 +51,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.codehaus.jackson.JsonNode;
 
 import okhttp3.HttpUrl;
@@ -73,13 +74,14 @@ import okhttp3.ResponseBody;
         @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type"),
         @WritesAttribute(attribute = "es.result.*", description = "If Target 
is 'Flow file attributes', the JSON attributes of "
                 + "each result will be placed into corresponding attributes 
with this prefix.") })
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        supportsExpressionLanguage = true,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")
 public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor 
{
 
-    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
-    private static final String QUERY_QUERY_PARAM = "q";
-    private static final String SORT_QUERY_PARAM = "sort";
     private static final String FROM_QUERY_PARAM = "from";
-    private static final String SIZE_QUERY_PARAM = "size";
 
     public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
     public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file 
attributes";
@@ -281,7 +283,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                 }
 
                 final URL queryUrl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
-                        mPageSize, fromIndex);
+                        mPageSize, fromIndex, context);
 
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
@@ -403,7 +405,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, 
String type, String fields,
-            String sort, int pageSize, int fromIndex) throws 
MalformedURLException {
+            String sort, int pageSize, int fromIndex, ProcessContext context) 
throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -425,6 +427,16 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
             builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
         }
 
+        // Find the user-added properties and set them as query parameters on 
the URL
+        for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
+            PropertyDescriptor pd = property.getKey();
+            if (pd.isDynamic()) {
+                if (property.getValue() != null) {
+                    builder.addQueryParameter(pd.getName(), 
context.getProperty(pd).evaluateAttributeExpressions().getValue());
+                }
+            }
+        }
+
         return builder.build().url();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 0442bf7..91b9176 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -32,6 +33,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.Stateful;
@@ -52,7 +54,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.codehaus.jackson.JsonNode;
 
 import okhttp3.HttpUrl;
@@ -73,18 +74,19 @@ import okhttp3.ResponseBody;
 @WritesAttributes({
         @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
         @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type") })
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        supportsExpressionLanguage = true,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")
 @Stateful(description = "After each successful scroll page, the latest 
scroll_id is persisted in scrollId as input for the next scroll call.  "
         + "Once the entire query is complete, finishedQuery state will be set 
to true, and the processor will not execute unless this is cleared.", scopes = 
{ Scope.LOCAL })
 public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
 
     private static final String FINISHED_QUERY_STATE = "finishedQuery";
     private static final String SCROLL_ID_STATE = "scrollId";
-    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
-    private static final String QUERY_QUERY_PARAM = "q";
-    private static final String SORT_QUERY_PARAM = "sort";
     private static final String SCROLL_QUERY_PARAM = "scroll";
     private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
-    private static final String SIZE_QUERY_PARAM = "size";
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -249,7 +251,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
                     .getValue());
             if (scrollId != null) {
                 final URL scrollurl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
-                        scrollId, pageSize, scroll);
+                        scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, scrollurl,
@@ -262,7 +264,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
 
                 // read the url property from the context
                 final URL queryUrl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
-                        scrollId, pageSize, scroll);
+                        scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -399,7 +401,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, 
String type, String fields,
-            String sort, String scrollId, int pageSize, String scroll) throws 
MalformedURLException {
+            String sort, String scrollId, int pageSize, String scroll, 
ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -427,6 +429,17 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
         }
         builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
 
+        // Find the user-added properties and set them as query parameters on 
the URL
+        for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
+            PropertyDescriptor pd = property.getKey();
+            if (pd.isDynamic()) {
+                if (property.getValue() != null) {
+                    builder.addQueryParameter(pd.getName(), 
context.getProperty(pd).evaluateAttributeExpressions().getValue());
+                }
+            }
+        }
+
+
         return builder.build().url();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index 346ead4..de56b49 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -293,6 +294,33 @@ public class TestFetchElasticsearchHttp {
 
     }
 
+    @Test
+    public void testFetchElasticsearchOnTriggerQueryParameter() throws 
IOException {
+        FetchElasticsearchHttpTestProcessor p = new 
FetchElasticsearchHttpTestProcessor(true); // all docs are found
+        
p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue";);
+        runner = TestRunners.newTestRunner(p);
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.setProperty(FetchElasticsearchHttp.FIELDS, "id");
+
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("myparam", "myvalue");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -302,8 +330,8 @@ public class TestFetchElasticsearchHttp {
         OkHttpClient client;
         int statusCode = 200;
         String statusMessage = "OK";
-
         URL url = null;
+        String expectedUrl = null;
 
         FetchElasticsearchHttpTestProcessor(boolean documentExists) {
             this.documentExists = documentExists;
@@ -318,6 +346,10 @@ public class TestFetchElasticsearchHttp {
             statusMessage = message;
         }
 
+        void setExpectedUrl(String url) {
+            expectedUrl = url;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -327,6 +359,7 @@ public class TestFetchElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws 
Throwable {
                     Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    assertTrue((expectedUrl == null) || 
(expectedUrl.equals(realRequest.url().toString())));
                     StringBuilder sb = new 
StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,");
                     if (documentExists) {
                         
sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}");

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index a8575d4..36aa94e 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -38,9 +38,11 @@ import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.net.ConnectException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -53,7 +55,7 @@ public class TestPutElasticsearchHttp {
     @Before
     public void once() throws IOException {
         ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
-        docExample = 
IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json")).getBytes();
+        docExample = 
IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json"), 
StandardCharsets.UTF_8).getBytes();
     }
 
     @After
@@ -322,6 +324,32 @@ public class TestPutElasticsearchHttp {
         assertNotNull(out);
     }
 
+    @Test
+    public void testPutElasticSearchOnTriggerQueryParameter() throws 
IOException {
+        PutElasticsearchTestProcessor p = new 
PutElasticsearchTestProcessor(false); // no failures
+        p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline";);
+
+        runner = TestRunners.newTestRunner(p);
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("pipeline", "my-pipeline");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -330,6 +358,7 @@ public class TestPutElasticsearchHttp {
         OkHttpClient client;
         int statusCode = 200;
         String statusMessage = "OK";
+        String expectedUrl = null;
 
         PutElasticsearchTestProcessor(boolean responseHasFailures) {
             this.responseHasFailures = responseHasFailures;
@@ -340,6 +369,10 @@ public class TestPutElasticsearchHttp {
             statusMessage = message;
         }
 
+        void setExpectedUrl(String url) {
+            expectedUrl = url;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -351,6 +384,7 @@ public class TestPutElasticsearchHttp {
                     final Call call = mock(Call.class);
                     if (statusCode != -1) {
                         Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                        assertTrue((expectedUrl == null) || 
(expectedUrl.equals(realRequest.url().toString())));
                         StringBuilder sb = new StringBuilder("{\"took\": 1, 
\"errors\": \"");
                         sb.append(responseHasFailures);
                         sb.append("\", \"items\": [");

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index e931236..75fb6ec 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -44,6 +44,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -332,6 +333,37 @@ public class TestPutElasticsearchHttpRecord {
         assertNotNull(out);
     }
 
+    @Test
+    public void testPutElasticSearchOnTriggerQueryParameter() throws 
IOException {
+        PutElasticsearchHttpRecordTestProcessor p = new 
PutElasticsearchHttpRecordTestProcessor(false); // no failures
+        p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline";);
+        runner = TestRunners.newTestRunner(p);
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("pipeline", "my-pipeline");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, 
provEvents.get(0).getEventType());
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -340,6 +372,7 @@ public class TestPutElasticsearchHttpRecord {
         OkHttpClient client;
         int statusCode = 200;
         String statusMessage = "OK";
+        String expectedUrl = null;
 
         PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
             this.responseHasFailures = responseHasFailures;
@@ -350,6 +383,10 @@ public class TestPutElasticsearchHttpRecord {
             statusMessage = message;
         }
 
+        void setExpectedUrl(String url) {
+            expectedUrl = url;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -358,6 +395,7 @@ public class TestPutElasticsearchHttpRecord {
                 final Call call = mock(Call.class);
                 if (statusCode != -1) {
                     Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    assertTrue((expectedUrl == null) || 
(expectedUrl.equals(realRequest.url().toString())));
                     StringBuilder sb = new StringBuilder("{\"took\": 1, 
\"errors\": \"");
                     sb.append(responseHasFailures);
                     sb.append("\", \"items\": [");

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index ccd74fa..4789496 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -18,11 +18,13 @@ package org.apache.nifi.processors.elasticsearch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -360,6 +362,22 @@ public class TestQueryElasticsearchHttp {
         runner.run(1, true, true);
     }
 
+    @Test
+    public void testQueryElasticsearchOnTrigger_withQueryParameters() throws 
IOException {
+        QueryElasticsearchHttpTestProcessor p = new 
QueryElasticsearchHttpTestProcessor();
+        p.setExpectedParam("myparam=myvalue");
+        runner = TestRunners.newTestRunner(p);
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("myparam", "myvalue");
+        runAndVerifySuccess(true);
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -376,6 +394,8 @@ public class TestQueryElasticsearchHttp {
         List<String> pages = Arrays.asList(getDoc("query-page1.json"), 
getDoc("query-page2.json"),
                 getDoc("query-page3.json"));
 
+        String expectedParam = null;
+
         public void setExceptionToThrow(Exception exceptionToThrow) {
             this.exceptionToThrow = exceptionToThrow;
         }
@@ -393,6 +413,16 @@ public class TestQueryElasticsearchHttp {
         }
 
         /**
+         * Sets an query parameter (name=value) expected to be at the end of 
the URL for the query operation
+         *
+         * @param param
+         *            The parameter to expect
+         */
+        void setExpectedParam(String param) {
+            expectedParam = param;
+        }
+
+        /**
          * Sets the status code and message for the runNumber-th query
          *
          * @param code
@@ -431,6 +461,7 @@ public class TestQueryElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws 
Throwable {
                     Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    assertTrue((expectedParam == null) || 
(realRequest.url().toString().endsWith(expectedParam)));
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)
                             .protocol(Protocol.HTTP_1_1)
@@ -456,8 +487,7 @@ public class TestQueryElasticsearchHttp {
 
     private static String getDoc(String filename) {
         try {
-            return 
IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader()
-                    .getResourceAsStream(filename));
+            return 
IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename),
 StandardCharsets.UTF_8);
         } catch (IOException e) {
             System.out.println("Error reading document " + filename);
             return "";

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b5015e3/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index a1a4e8d..1e687f1 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -17,11 +17,13 @@
 package org.apache.nifi.processors.elasticsearch;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -316,6 +318,24 @@ public class TestScrollElasticsearchHttp {
         runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testScrollElasticsearchOnTrigger_withQueryParameter() throws 
IOException {
+        ScrollElasticsearchHttpTestProcessor p = new 
ScrollElasticsearchHttpTestProcessor();
+        p.setExpectedParam("myparam=myvalue");
+        runner = TestRunners.newTestRunner(p);
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
+        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:WZ");
+        runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2");
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("myparam", "myvalue");
+        runner.setIncomingConnection(false);
+        runAndVerifySuccess();
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -332,6 +352,8 @@ public class TestScrollElasticsearchHttp {
         List<String> pages = Arrays.asList(getDoc("scroll-page1.json"),
                 getDoc("scroll-page2.json"), getDoc("scroll-page3.json"));
 
+        String expectedParam = null;
+
         public void setExceptionToThrow(Exception exceptionToThrow) {
             this.exceptionToThrow = exceptionToThrow;
         }
@@ -364,6 +386,16 @@ public class TestScrollElasticsearchHttp {
             this.runNumber = runNumber;
         }
 
+        /**
+         * Sets an query parameter (name=value) expected to be at the end of 
the URL for the query operation
+         *
+         * @param param
+         *            The parameter to expect
+         */
+        void setExpectedParam(String param) {
+            expectedParam = param;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -387,6 +419,7 @@ public class TestScrollElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws 
Throwable {
                     Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    assertTrue((expectedParam == null) || 
(realRequest.url().toString().endsWith(expectedParam)));
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)
                             .protocol(Protocol.HTTP_1_1)
@@ -412,8 +445,7 @@ public class TestScrollElasticsearchHttp {
 
     private static String getDoc(String filename) {
         try {
-            return 
IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader()
-                    .getResourceAsStream(filename));
+            return 
IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename),
 StandardCharsets.UTF_8);
         } catch (IOException e) {
             System.out.println("Error reading document " + filename);
             return "";

Reply via email to