Repository: nifi Updated Branches: refs/heads/master 4e09a03f8 -> 66783c18b
NIFI-5427: Updating ScrollElasticsearchHttp to use POST, supporting ES6 This closes #2890 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/66783c18 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/66783c18 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/66783c18 Branch: refs/heads/master Commit: 66783c18b24b1c6b1cfd662c58ca9df1e60b866e Parents: 4e09a03 Author: Joe Gresock <joseph.gres...@lmco.com> Authored: Fri Jul 13 16:29:45 2018 +0000 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Wed Jul 18 13:50:53 2018 -0400 ---------------------------------------------------------------------- .../AbstractElasticsearchHttpProcessor.java | 2 ++ .../elasticsearch/ScrollElasticsearchHttp.java | 13 +++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/66783c18/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index 0b40c75..8cb34a1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -250,6 +250,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic requestBuilder = requestBuilder.get(); } else if ("put".equalsIgnoreCase(verb)) { requestBuilder = requestBuilder.put(body); + } else if ("post".equalsIgnoreCase(verb)) { + requestBuilder = requestBuilder.post(body); } else { throw new IllegalArgumentException("Elasticsearch REST API verb not supported by this processor: " + verb); } http://git-wip-us.apache.org/repos/asf/nifi/blob/66783c18/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index e90af79..01e5ae1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -18,7 +18,9 @@ package org.apache.nifi.processors.elasticsearch; import com.fasterxml.jackson.databind.JsonNode; import okhttp3.HttpUrl; +import okhttp3.MediaType; import okhttp3.OkHttpClient; +import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.commons.lang3.StringUtils; @@ -86,7 +88,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor private static final String FINISHED_QUERY_STATE = "finishedQuery"; private static final String SCROLL_ID_STATE = "scrollId"; private static final String SCROLL_QUERY_PARAM = "scroll"; - private static final String SCROLL_ID_QUERY_PARAM = "scroll_id"; public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -260,8 +261,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor scrollId, pageSize, scroll, context); final long startNanos = System.nanoTime(); + final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll, + scrollId); + + final RequestBody body = RequestBody.create(MediaType.parse("application/json"), scrollBody); + final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl, - username, password, "GET", null); + username, password, "POST", body); this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos); getResponse.close(); } else { @@ -415,7 +421,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor if (!StringUtils.isEmpty(scrollId)) { builder.addPathSegment("_search"); builder.addPathSegment("scroll"); - builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId); } else { builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index); if (!StringUtils.isEmpty(type)) { @@ -432,8 +437,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(",")); builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields); } + builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll); } - builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll); // Find the user-added properties and set them as query parameters on the URL for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {